diff options
Diffstat (limited to 'tests/unittests/sources/test_ibmcloud.py')
-rw-r--r-- | tests/unittests/sources/test_ibmcloud.py | 426 |
1 files changed, 426 insertions, 0 deletions
diff --git a/tests/unittests/sources/test_ibmcloud.py b/tests/unittests/sources/test_ibmcloud.py new file mode 100644 index 00000000..17a8be64 --- /dev/null +++ b/tests/unittests/sources/test_ibmcloud.py @@ -0,0 +1,426 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import base64 +import copy +import json +from textwrap import dedent + +from cloudinit import util +from cloudinit.helpers import Paths +from cloudinit.sources import DataSourceIBMCloud as ibm +from tests.unittests import helpers as test_helpers + +mock = test_helpers.mock + +D_PATH = "cloudinit.sources.DataSourceIBMCloud." + + +@mock.patch(D_PATH + "_is_xen", return_value=True) +@mock.patch(D_PATH + "_is_ibm_provisioning") +@mock.patch(D_PATH + "util.blkid") +class TestGetIBMPlatform(test_helpers.CiTestCase): + """Test the get_ibm_platform helper.""" + + blkid_base = { + "/dev/xvda1": { + "DEVNAME": "/dev/xvda1", + "LABEL": "cloudimg-bootfs", + "TYPE": "ext3", + }, + "/dev/xvda2": { + "DEVNAME": "/dev/xvda2", + "LABEL": "cloudimg-rootfs", + "TYPE": "ext4", + }, + } + + blkid_metadata_disk = { + "/dev/xvdh1": { + "DEVNAME": "/dev/xvdh1", + "LABEL": "METADATA", + "TYPE": "vfat", + "SEC_TYPE": "msdos", + "UUID": "681B-8C5D", + "PARTUUID": "3d631e09-01", + }, + } + + blkid_oscode_disk = { + "/dev/xvdh": { + "DEVNAME": "/dev/xvdh", + "LABEL": "config-2", + "TYPE": "vfat", + "SEC_TYPE": "msdos", + "UUID": ibm.IBM_CONFIG_UUID, + } + } + + def setUp(self): + self.blkid_metadata = copy.deepcopy(self.blkid_base) + self.blkid_metadata.update(copy.deepcopy(self.blkid_metadata_disk)) + + self.blkid_oscode = copy.deepcopy(self.blkid_base) + self.blkid_oscode.update(copy.deepcopy(self.blkid_oscode_disk)) + + def test_id_template_live_metadata(self, m_blkid, m_is_prov, _m_xen): + """identify TEMPLATE_LIVE_METADATA.""" + m_blkid.return_value = self.blkid_metadata + m_is_prov.return_value = False + self.assertEqual( + (ibm.Platforms.TEMPLATE_LIVE_METADATA, "/dev/xvdh1"), + ibm.get_ibm_platform(), + ) + + def test_id_template_prov_metadata(self, m_blkid, m_is_prov, _m_xen): + """identify TEMPLATE_PROVISIONING_METADATA.""" + m_blkid.return_value = self.blkid_metadata + m_is_prov.return_value = True + self.assertEqual( + (ibm.Platforms.TEMPLATE_PROVISIONING_METADATA, "/dev/xvdh1"), + ibm.get_ibm_platform(), + ) + + def test_id_template_prov_nodata(self, m_blkid, m_is_prov, _m_xen): + """identify TEMPLATE_PROVISIONING_NODATA.""" + m_blkid.return_value = self.blkid_base + m_is_prov.return_value = True + self.assertEqual( + (ibm.Platforms.TEMPLATE_PROVISIONING_NODATA, None), + ibm.get_ibm_platform(), + ) + + def test_id_os_code(self, m_blkid, m_is_prov, _m_xen): + """Identify OS_CODE.""" + m_blkid.return_value = self.blkid_oscode + m_is_prov.return_value = False + self.assertEqual( + (ibm.Platforms.OS_CODE, "/dev/xvdh"), ibm.get_ibm_platform() + ) + + def test_id_os_code_must_match_uuid(self, m_blkid, m_is_prov, _m_xen): + """Test against false positive on openstack with non-ibm UUID.""" + blkid = self.blkid_oscode + blkid["/dev/xvdh"]["UUID"] = "9999-9999" + m_blkid.return_value = blkid + m_is_prov.return_value = False + self.assertEqual((None, None), ibm.get_ibm_platform()) + + +@mock.patch(D_PATH + "_read_system_uuid", return_value=None) +@mock.patch(D_PATH + "get_ibm_platform") +class TestReadMD(test_helpers.CiTestCase): + """Test the read_datasource helper.""" + + template_md = { + "files": [], + "network_config": {"content_path": "/content/interfaces"}, + "hostname": "ci-fond-ram", + "name": "ci-fond-ram", + "domain": "testing.ci.cloud-init.org", + "meta": {"dsmode": "net"}, + "uuid": "8e636730-9f5d-c4a5-327c-d7123c46e82f", + "public_keys": {"1091307": "ssh-rsa AAAAB3NzaC1...Hw== ci-pubkey"}, + } + + oscode_md = { + "hostname": "ci-grand-gannet.testing.ci.cloud-init.org", + "name": "ci-grand-gannet", + "uuid": "2f266908-8e6c-4818-9b5c-42e9cc66a785", + "random_seed": "bm90LXJhbmRvbQo=", + "crypt_key": "ssh-rsa AAAAB3NzaC1yc2..n6z/", + "configuration_token": "eyJhbGciOi..M3ZA", + "public_keys": {"1091307": "ssh-rsa AAAAB3N..Hw== ci-pubkey"}, + } + + content_interfaces = dedent( + """\ + auto lo + iface lo inet loopback + + auto eth0 + allow-hotplug eth0 + iface eth0 inet static + address 10.82.43.5 + netmask 255.255.255.192 + """ + ) + + userdata = b"#!/bin/sh\necho hi mom\n" + # meta.js file gets json encoded userdata as a list. + meta_js = '["#!/bin/sh\necho hi mom\n"]' + vendor_data = { + "cloud-init": "#!/bin/bash\necho 'root:$6$5ab01p1m1' | chpasswd -e" + } + + network_data = { + "links": [ + { + "id": "interface_29402281", + "name": "eth0", + "mtu": None, + "type": "phy", + "ethernet_mac_address": "06:00:f1:bd:da:25", + }, + { + "id": "interface_29402279", + "name": "eth1", + "mtu": None, + "type": "phy", + "ethernet_mac_address": "06:98:5e:d0:7f:86", + }, + ], + "networks": [ + { + "id": "network_109887563", + "link": "interface_29402281", + "type": "ipv4", + "ip_address": "10.82.43.2", + "netmask": "255.255.255.192", + "routes": [ + { + "network": "10.0.0.0", + "netmask": "255.0.0.0", + "gateway": "10.82.43.1", + }, + { + "network": "161.26.0.0", + "netmask": "255.255.0.0", + "gateway": "10.82.43.1", + }, + ], + }, + { + "id": "network_109887551", + "link": "interface_29402279", + "type": "ipv4", + "ip_address": "108.168.194.252", + "netmask": "255.255.255.248", + "routes": [ + { + "network": "0.0.0.0", + "netmask": "0.0.0.0", + "gateway": "108.168.194.249", + } + ], + }, + ], + "services": [ + {"type": "dns", "address": "10.0.80.11"}, + {"type": "dns", "address": "10.0.80.12"}, + ], + } + + sysuuid = "7f79ebf5-d791-43c3-a723-854e8389d59f" + + def _get_expected_metadata(self, os_md): + """return expected 'metadata' for data loaded from meta_data.json.""" + os_md = copy.deepcopy(os_md) + renames = ( + ("hostname", "local-hostname"), + ("uuid", "instance-id"), + ("public_keys", "public-keys"), + ) + ret = {} + for osname, mdname in renames: + if osname in os_md: + ret[mdname] = os_md[osname] + if "random_seed" in os_md: + ret["random_seed"] = base64.b64decode(os_md["random_seed"]) + + return ret + + def test_provisioning_md(self, m_platform, m_sysuuid): + """Provisioning env with a metadata disk should return None.""" + m_platform.return_value = ( + ibm.Platforms.TEMPLATE_PROVISIONING_METADATA, + "/dev/xvdh", + ) + self.assertIsNone(ibm.read_md()) + + def test_provisioning_no_metadata(self, m_platform, m_sysuuid): + """Provisioning env with no metadata disk should return None.""" + m_platform.return_value = ( + ibm.Platforms.TEMPLATE_PROVISIONING_NODATA, + None, + ) + self.assertIsNone(ibm.read_md()) + + def test_provisioning_not_ibm(self, m_platform, m_sysuuid): + """Provisioning env but not identified as IBM should return None.""" + m_platform.return_value = (None, None) + self.assertIsNone(ibm.read_md()) + + def test_template_live(self, m_platform, m_sysuuid): + """Template live environment should be identified.""" + tmpdir = self.tmp_dir() + m_platform.return_value = ( + ibm.Platforms.TEMPLATE_LIVE_METADATA, + tmpdir, + ) + m_sysuuid.return_value = self.sysuuid + + test_helpers.populate_dir( + tmpdir, + { + "openstack/latest/meta_data.json": json.dumps( + self.template_md + ), + "openstack/latest/user_data": self.userdata, + "openstack/content/interfaces": self.content_interfaces, + "meta.js": self.meta_js, + }, + ) + + ret = ibm.read_md() + self.assertEqual(ibm.Platforms.TEMPLATE_LIVE_METADATA, ret["platform"]) + self.assertEqual(tmpdir, ret["source"]) + self.assertEqual(self.userdata, ret["userdata"]) + self.assertEqual( + self._get_expected_metadata(self.template_md), ret["metadata"] + ) + self.assertEqual(self.sysuuid, ret["system-uuid"]) + + def test_os_code_live(self, m_platform, m_sysuuid): + """Verify an os_code metadata path.""" + tmpdir = self.tmp_dir() + m_platform.return_value = (ibm.Platforms.OS_CODE, tmpdir) + netdata = json.dumps(self.network_data) + test_helpers.populate_dir( + tmpdir, + { + "openstack/latest/meta_data.json": json.dumps(self.oscode_md), + "openstack/latest/user_data": self.userdata, + "openstack/latest/vendor_data.json": json.dumps( + self.vendor_data + ), + "openstack/latest/network_data.json": netdata, + }, + ) + + ret = ibm.read_md() + self.assertEqual(ibm.Platforms.OS_CODE, ret["platform"]) + self.assertEqual(tmpdir, ret["source"]) + self.assertEqual(self.userdata, ret["userdata"]) + self.assertEqual( + self._get_expected_metadata(self.oscode_md), ret["metadata"] + ) + + def test_os_code_live_no_userdata(self, m_platform, m_sysuuid): + """Verify os_code without user-data.""" + tmpdir = self.tmp_dir() + m_platform.return_value = (ibm.Platforms.OS_CODE, tmpdir) + test_helpers.populate_dir( + tmpdir, + { + "openstack/latest/meta_data.json": json.dumps(self.oscode_md), + "openstack/latest/vendor_data.json": json.dumps( + self.vendor_data + ), + }, + ) + + ret = ibm.read_md() + self.assertEqual(ibm.Platforms.OS_CODE, ret["platform"]) + self.assertEqual(tmpdir, ret["source"]) + self.assertIsNone(ret["userdata"]) + self.assertEqual( + self._get_expected_metadata(self.oscode_md), ret["metadata"] + ) + + +class TestIsIBMProvisioning(test_helpers.FilesystemMockingTestCase): + """Test the _is_ibm_provisioning method.""" + + inst_log = "/root/swinstall.log" + prov_cfg = "/root/provisioningConfiguration.cfg" + boot_ref = "/proc/1/environ" + with_logs = True + + def _call_with_root(self, rootd): + self.reRoot(rootd) + return ibm._is_ibm_provisioning() + + def test_no_config(self): + """No provisioning config means not provisioning.""" + self.assertFalse(self._call_with_root(self.tmp_dir())) + + def test_config_only(self): + """A provisioning config without a log means provisioning.""" + rootd = self.tmp_dir() + test_helpers.populate_dir(rootd, {self.prov_cfg: "key=value"}) + self.assertTrue(self._call_with_root(rootd)) + + def test_config_with_old_log(self): + """A config with a log from previous boot is not provisioning.""" + rootd = self.tmp_dir() + data = { + self.prov_cfg: ("key=value\nkey2=val2\n", -10), + self.inst_log: ("log data\n", -30), + self.boot_ref: ("PWD=/", 0), + } + test_helpers.populate_dir_with_ts(rootd, data) + self.assertFalse(self._call_with_root(rootd=rootd)) + self.assertIn("from previous boot", self.logs.getvalue()) + + def test_config_with_new_log(self): + """A config with a log from this boot is provisioning.""" + rootd = self.tmp_dir() + data = { + self.prov_cfg: ("key=value\nkey2=val2\n", -10), + self.inst_log: ("log data\n", 30), + self.boot_ref: ("PWD=/", 0), + } + test_helpers.populate_dir_with_ts(rootd, data) + self.assertTrue(self._call_with_root(rootd=rootd)) + self.assertIn("from current boot", self.logs.getvalue()) + + def test_config_and_log_no_reference(self): + """If the config and log existed, but no reference, assume not.""" + rootd = self.tmp_dir() + test_helpers.populate_dir( + rootd, {self.prov_cfg: "key=value", self.inst_log: "log data\n"} + ) + self.assertFalse(self._call_with_root(rootd=rootd)) + self.assertIn("no reference file", self.logs.getvalue()) + + +class TestDataSourceIBMCloud(test_helpers.CiTestCase): + def setUp(self): + super(TestDataSourceIBMCloud, self).setUp() + self.tmp = self.tmp_dir() + self.cloud_dir = self.tmp_path("cloud", dir=self.tmp) + util.ensure_dir(self.cloud_dir) + paths = Paths({"run_dir": self.tmp, "cloud_dir": self.cloud_dir}) + self.ds = ibm.DataSourceIBMCloud(sys_cfg={}, distro=None, paths=paths) + + def test_get_data_false(self): + """When read_md returns None, get_data returns False.""" + with mock.patch(D_PATH + "read_md", return_value=None): + self.assertFalse(self.ds.get_data()) + + def test_get_data_processes_read_md(self): + """get_data processes and caches content returned by read_md.""" + md = { + "metadata": {}, + "networkdata": "net", + "platform": "plat", + "source": "src", + "system-uuid": "uuid", + "userdata": "ud", + "vendordata": "vd", + } + with mock.patch(D_PATH + "read_md", return_value=md): + self.assertTrue(self.ds.get_data()) + self.assertEqual("src", self.ds.source) + self.assertEqual("plat", self.ds.platform) + self.assertEqual({}, self.ds.metadata) + self.assertEqual("ud", self.ds.userdata_raw) + self.assertEqual("net", self.ds.network_json) + self.assertEqual("vd", self.ds.vendordata_pure) + self.assertEqual("uuid", self.ds.system_uuid) + self.assertEqual("ibmcloud", self.ds.cloud_name) + self.assertEqual("ibmcloud", self.ds.platform_type) + self.assertEqual("plat (src)", self.ds.subplatform) + + +# vi: ts=4 expandtab |