summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cloudinit/sources/DataSourceOracle.py178
-rw-r--r--cloudinit/sources/tests/test_oracle.py776
-rw-r--r--conftest.py116
-rw-r--r--tox.ini1
4 files changed, 538 insertions, 533 deletions
diff --git a/cloudinit/sources/DataSourceOracle.py b/cloudinit/sources/DataSourceOracle.py
index 90e1881a..f113d364 100644
--- a/cloudinit/sources/DataSourceOracle.py
+++ b/cloudinit/sources/DataSourceOracle.py
@@ -1,22 +1,20 @@
# This file is part of cloud-init. See LICENSE file for license information.
"""Datasource for Oracle (OCI/Oracle Cloud Infrastructure)
-OCI provides a OpenStack like metadata service which provides only
-'2013-10-17' and 'latest' versions..
-
Notes:
- * This datasource does not support the OCI-Classic. OCI-Classic
- provides an EC2 lookalike metadata service.
- * The uuid provided in DMI data is not the same as the meta-data provided
+ * This datasource does not support OCI Classic. OCI Classic provides an EC2
+ lookalike metadata service.
+ * The UUID provided in DMI data is not the same as the meta-data provided
instance-id, but has an equivalent lifespan.
* We do need to support upgrade from an instance that cloud-init
identified as OpenStack.
- * Both bare-metal and vms use iscsi root
- * Both bare-metal and vms provide chassis-asset-tag of OracleCloud.com
+ * Bare metal instances use iSCSI root, virtual machine instances do not.
+ * Both bare metal and virtual machine instances provide a chassis-asset-tag of
+ OracleCloud.com.
"""
+import base64
import json
-import re
from cloudinit import log as logging
from cloudinit import net, sources, util
@@ -26,7 +24,7 @@ from cloudinit.net import (
get_interfaces_by_mac,
is_netfail_master,
)
-from cloudinit.url_helper import UrlError, combine_url, readurl
+from cloudinit.url_helper import readurl
LOG = logging.getLogger(__name__)
@@ -35,8 +33,9 @@ BUILTIN_DS_CONFIG = {
'configure_secondary_nics': False,
}
CHASSIS_ASSET_TAG = "OracleCloud.com"
-METADATA_ENDPOINT = "http://169.254.169.254/openstack/"
-VNIC_METADATA_URL = 'http://169.254.169.254/opc/v1/vnics/'
+METADATA_ROOT = "http://169.254.169.254/opc/v1/"
+METADATA_ENDPOINT = METADATA_ROOT + "instance/"
+VNIC_METADATA_URL = METADATA_ROOT + "vnics/"
# https://docs.cloud.oracle.com/iaas/Content/Network/Troubleshoot/connectionhang.htm#Overview,
# indicates that an MTU of 9000 is used within OCI
MTU = 9000
@@ -189,53 +188,39 @@ class DataSourceOracle(sources.DataSource):
if not self._is_platform_viable():
return False
+ self.system_uuid = _read_system_uuid()
+
# network may be configured if iscsi root. If that is the case
# then read_initramfs_config will return non-None.
if _is_iscsi_root():
- data = self.crawl_metadata()
+ data = read_opc_metadata()
else:
with dhcp.EphemeralDHCPv4(net.find_fallback_nic()):
- data = self.crawl_metadata()
+ data = read_opc_metadata()
self._crawled_metadata = data
- vdata = data['2013-10-17']
-
- self.userdata_raw = vdata.get('user_data')
- self.system_uuid = vdata['system_uuid']
-
- vd = vdata.get('vendor_data')
- if vd:
- self.vendordata_pure = vd
- try:
- self.vendordata_raw = sources.convert_vendordata(vd)
- except ValueError as e:
- LOG.warning("Invalid content in vendor-data: %s", e)
- self.vendordata_raw = None
-
- mdcopies = ('public_keys',)
- md = dict([(k, vdata['meta_data'].get(k))
- for k in mdcopies if k in vdata['meta_data']])
-
- mdtrans = (
- # oracle meta_data.json name, cloudinit.datasource.metadata name
- ('availability_zone', 'availability-zone'),
- ('hostname', 'local-hostname'),
- ('launch_index', 'launch-index'),
- ('uuid', 'instance-id'),
- )
- for dsname, ciname in mdtrans:
- if dsname in vdata['meta_data']:
- md[ciname] = vdata['meta_data'][dsname]
- self.metadata = md
- return True
+ self.metadata = {
+ "availability-zone": data["ociAdName"],
+ "instance-id": data["id"],
+ "launch-index": 0,
+ "local-hostname": data["hostname"],
+ "name": data["displayName"],
+ }
+
+ if "metadata" in data:
+ user_data = data["metadata"].get("user_data")
+ if user_data:
+ self.userdata_raw = base64.b64decode(user_data)
+ self.metadata["public_keys"] = data["metadata"].get(
+ "ssh_authorized_keys"
+ )
- def crawl_metadata(self):
- return read_metadata()
+ return True
def _get_subplatform(self):
"""Return the subplatform metadata source details."""
- return 'metadata (%s)' % METADATA_ENDPOINT
+ return "metadata ({})".format(METADATA_ROOT)
def check_instance_id(self, sys_cfg):
"""quickly check (local only) if self.instance_id is still valid
@@ -292,72 +277,15 @@ def _is_iscsi_root():
return bool(cmdline.read_initramfs_config())
-def _load_index(content):
- """Return a list entries parsed from content.
-
- OpenStack's metadata service returns a newline delimited list
- of items. Oracle's implementation has html formatted list of links.
- The parser here just grabs targets from <a href="target">
- and throws away "../".
-
- Oracle has accepted that to be buggy and may fix in the future
- to instead return a '\n' delimited plain text list. This function
- will continue to work if that change is made."""
- if not content.lower().startswith("<html>"):
- return content.splitlines()
- items = re.findall(
- r'href="(?P<target>[^"]*)"', content, re.MULTILINE | re.IGNORECASE)
- return [i for i in items if not i.startswith(".")]
-
-
-def read_metadata(endpoint_base=METADATA_ENDPOINT, sys_uuid=None,
- version='2013-10-17'):
- """Read metadata, return a dictionary.
-
- Each path listed in the index will be represented in the dictionary.
- If the path ends in .json, then the content will be decoded and
- populated into the dictionary.
-
- The system uuid (/sys/class/dmi/id/product_uuid) is also populated.
- Example: given paths = ('user_data', 'meta_data.json')
- This would return:
- {version: {'user_data': b'blob', 'meta_data': json.loads(blob.decode())
- 'system_uuid': '3b54f2e0-3ab2-458d-b770-af9926eee3b2'}}
+def read_opc_metadata():
"""
- endpoint = combine_url(endpoint_base, version) + "/"
- if sys_uuid is None:
- sys_uuid = _read_system_uuid()
- if not sys_uuid:
- raise sources.BrokenMetadata("Failed to read system uuid.")
-
- try:
- resp = readurl(endpoint)
- if not resp.ok():
- raise sources.BrokenMetadata(
- "Bad response from %s: %s" % (endpoint, resp.code))
- except UrlError as e:
- raise sources.BrokenMetadata(
- "Failed to read index at %s: %s" % (endpoint, e))
-
- entries = _load_index(resp.contents.decode('utf-8'))
- LOG.debug("index url %s contained: %s", endpoint, entries)
-
- # meta_data.json is required.
- mdj = 'meta_data.json'
- if mdj not in entries:
- raise sources.BrokenMetadata(
- "Required field '%s' missing in index at %s" % (mdj, endpoint))
-
- ret = {'system_uuid': sys_uuid}
- for path in entries:
- response = readurl(combine_url(endpoint, path))
- if path.endswith(".json"):
- ret[path.rpartition(".")[0]] = (
- json.loads(response.contents.decode('utf-8')))
- else:
- ret[path] = response.contents
+ Fetch metadata from the /opc/ routes.
- return {version: ret}
+ :return:
+ The JSON-decoded value of the /opc/v1/instance/ endpoint on the IMDS.
+ """
+ # retries=1 as requested by Oracle to address a potential race condition
+ return json.loads(readurl(METADATA_ENDPOINT, retries=1)._response.text)
# Used to match classes to dependencies
@@ -373,17 +301,21 @@ def get_datasource_list(depends):
if __name__ == "__main__":
import argparse
- import os
-
- parser = argparse.ArgumentParser(description='Query Oracle Cloud Metadata')
- parser.add_argument("--endpoint", metavar="URL",
- help="The url of the metadata service.",
- default=METADATA_ENDPOINT)
- args = parser.parse_args()
- sys_uuid = "uuid-not-available-not-root" if os.geteuid() != 0 else None
-
- data = read_metadata(endpoint_base=args.endpoint, sys_uuid=sys_uuid)
- data['is_platform_viable'] = _is_platform_viable()
- print(util.json_dumps(data))
+
+ description = """
+ Query Oracle Cloud metadata and emit a JSON object with two keys:
+ `read_opc_metadata` and `_is_platform_viable`. The values of each are
+ the return values of the corresponding functions defined in
+ DataSourceOracle.py."""
+ parser = argparse.ArgumentParser(description=description)
+ parser.parse_args()
+ print(
+ util.json_dumps(
+ {
+ "read_opc_metadata": read_opc_metadata(),
+ "_is_platform_viable": _is_platform_viable(),
+ }
+ )
+ )
# vi: ts=4 expandtab
diff --git a/cloudinit/sources/tests/test_oracle.py b/cloudinit/sources/tests/test_oracle.py
index 2265327b..9ee6e7fa 100644
--- a/cloudinit/sources/tests/test_oracle.py
+++ b/cloudinit/sources/tests/test_oracle.py
@@ -1,23 +1,19 @@
# This file is part of cloud-init. See LICENSE file for license information.
-import argparse
+import base64
import copy
import json
-import os
-import uuid
-from textwrap import dedent
+from contextlib import ExitStack
from unittest import mock
-import httpretty
+import pytest
-from cloudinit import helpers
-from cloudinit.sources import BrokenMetadata
from cloudinit.sources import DataSourceOracle as oracle
from cloudinit.sources import NetworkConfigSource
from cloudinit.tests import helpers as test_helpers
+from cloudinit.url_helper import UrlError
DS_PATH = "cloudinit.sources.DataSourceOracle"
-MD_VER = "2013-10-17"
# `curl -L http://169.254.169.254/opc/v1/vnics/` on a Oracle Bare Metal Machine
# with a secondary VNIC attached (vnicId truncated for Python line length)
@@ -60,330 +56,80 @@ OPC_VM_SECONDARY_VNIC_RESPONSE = """\
} ]"""
-class TestDataSourceOracle(test_helpers.CiTestCase):
- """Test datasource DataSourceOracle."""
-
- with_logs = True
-
- ds_class = oracle.DataSourceOracle
-
- my_uuid = str(uuid.uuid4())
- my_md = {"uuid": "ocid1.instance.oc1.phx.abyhqlj",
- "name": "ci-vm1", "availability_zone": "phx-ad-3",
- "hostname": "ci-vm1hostname",
- "launch_index": 0, "files": [],
- "public_keys": {"0": "ssh-rsa AAAAB3N...== user@host"},
- "meta": {}}
-
- def _patch_instance(self, inst, patches):
- """Patch an instance of a class 'inst'.
- for each name, kwargs in patches:
- inst.name = mock.Mock(**kwargs)
- returns a namespace object that has
- namespace.name = mock.Mock(**kwargs)
- Do not bother with cleanup as instance is assumed transient."""
- mocks = argparse.Namespace()
- for name, kwargs in patches.items():
- imock = mock.Mock(name=name, spec=getattr(inst, name), **kwargs)
- setattr(mocks, name, imock)
- setattr(inst, name, imock)
- return mocks
-
- def _get_ds(self, sys_cfg=None, distro=None, paths=None, ud_proc=None,
- patches=None):
- if sys_cfg is None:
- sys_cfg = {}
- if patches is None:
- patches = {}
- if paths is None:
- tmpd = self.tmp_dir()
- dirs = {'cloud_dir': self.tmp_path('cloud_dir', tmpd),
- 'run_dir': self.tmp_path('run_dir')}
- for d in dirs.values():
- os.mkdir(d)
- paths = helpers.Paths(dirs)
-
- ds = self.ds_class(sys_cfg=sys_cfg, distro=distro,
- paths=paths, ud_proc=ud_proc)
-
- return ds, self._patch_instance(ds, patches)
-
- def test_platform_not_viable_returns_false(self):
- ds, mocks = self._get_ds(
- patches={'_is_platform_viable': {'return_value': False}})
- self.assertFalse(ds._get_data())
- mocks._is_platform_viable.assert_called_once_with()
-
- def test_platform_info(self):
- """Return platform-related information for Oracle Datasource."""
- ds, _mocks = self._get_ds()
- self.assertEqual('oracle', ds.cloud_name)
- self.assertEqual('oracle', ds.platform_type)
- self.assertEqual(
- 'metadata (http://169.254.169.254/openstack/)', ds.subplatform)
-
- def test_sys_cfg_can_enable_configure_secondary_nics(self):
- # Confirm that behaviour is toggled by sys_cfg
- ds, _mocks = self._get_ds()
- self.assertFalse(ds.ds_cfg['configure_secondary_nics'])
-
- sys_cfg = {
- 'datasource': {'Oracle': {'configure_secondary_nics': True}}}
- ds, _mocks = self._get_ds(sys_cfg=sys_cfg)
- self.assertTrue(ds.ds_cfg['configure_secondary_nics'])
-
- @mock.patch(DS_PATH + "._is_iscsi_root", return_value=True)
- def test_without_userdata(self, m_is_iscsi_root):
- """If no user-data is provided, it should not be in return dict."""
- ds, mocks = self._get_ds(patches={
- '_is_platform_viable': {'return_value': True},
- 'crawl_metadata': {
- 'return_value': {
- MD_VER: {'system_uuid': self.my_uuid,
- 'meta_data': self.my_md}}}})
- self.assertTrue(ds._get_data())
- mocks._is_platform_viable.assert_called_once_with()
- mocks.crawl_metadata.assert_called_once_with()
- self.assertEqual(self.my_uuid, ds.system_uuid)
- self.assertEqual(self.my_md['availability_zone'], ds.availability_zone)
- self.assertIn(self.my_md["public_keys"]["0"], ds.get_public_ssh_keys())
- self.assertEqual(self.my_md['uuid'], ds.get_instance_id())
- self.assertIsNone(ds.userdata_raw)
-
- @mock.patch(DS_PATH + "._is_iscsi_root", return_value=True)
- def test_with_vendordata(self, m_is_iscsi_root):
- """Test with vendor data."""
- vd = {'cloud-init': '#cloud-config\nkey: value'}
- ds, mocks = self._get_ds(patches={
- '_is_platform_viable': {'return_value': True},
- 'crawl_metadata': {
- 'return_value': {
- MD_VER: {'system_uuid': self.my_uuid,
- 'meta_data': self.my_md,
- 'vendor_data': vd}}}})
- self.assertTrue(ds._get_data())
- mocks._is_platform_viable.assert_called_once_with()
- mocks.crawl_metadata.assert_called_once_with()
- self.assertEqual(vd, ds.vendordata_pure)
- self.assertEqual(vd['cloud-init'], ds.vendordata_raw)
-
- @mock.patch(DS_PATH + "._is_iscsi_root", return_value=True)
- def test_with_userdata(self, m_is_iscsi_root):
- """Ensure user-data is populated if present and is binary."""
- my_userdata = b'abcdefg'
- ds, mocks = self._get_ds(patches={
- '_is_platform_viable': {'return_value': True},
- 'crawl_metadata': {
- 'return_value': {
- MD_VER: {'system_uuid': self.my_uuid,
- 'meta_data': self.my_md,
- 'user_data': my_userdata}}}})
- self.assertTrue(ds._get_data())
- mocks._is_platform_viable.assert_called_once_with()
- mocks.crawl_metadata.assert_called_once_with()
- self.assertEqual(self.my_uuid, ds.system_uuid)
- self.assertIn(self.my_md["public_keys"]["0"], ds.get_public_ssh_keys())
- self.assertEqual(self.my_md['uuid'], ds.get_instance_id())
- self.assertEqual(my_userdata, ds.userdata_raw)
-
- @mock.patch(DS_PATH + ".get_interfaces_by_mac", mock.Mock(return_value={}))
- @mock.patch(DS_PATH + "._add_network_config_from_opc_imds",
- side_effect=lambda network_config: network_config)
- @mock.patch(DS_PATH + ".cmdline.read_initramfs_config")
- @mock.patch(DS_PATH + "._is_iscsi_root", return_value=True)
- def test_network_cmdline(self, m_is_iscsi_root, m_initramfs_config,
- _m_add_network_config_from_opc_imds):
- """network_config should read kernel cmdline."""
- distro = mock.MagicMock()
- ds, _ = self._get_ds(distro=distro, patches={
- '_is_platform_viable': {'return_value': True},
- 'crawl_metadata': {
- 'return_value': {
- MD_VER: {'system_uuid': self.my_uuid,
- 'meta_data': self.my_md}}}})
- ncfg = {'version': 1, 'config': [{'a': 'b'}]}
- m_initramfs_config.return_value = ncfg
- self.assertTrue(ds._get_data())
- self.assertEqual(ncfg, ds.network_config)
- self.assertEqual([mock.call()], m_initramfs_config.call_args_list)
- self.assertFalse(distro.generate_fallback_config.called)
-
- @mock.patch(DS_PATH + ".get_interfaces_by_mac", mock.Mock(return_value={}))
- @mock.patch(DS_PATH + "._add_network_config_from_opc_imds",
- side_effect=lambda network_config: network_config)
- @mock.patch(DS_PATH + ".cmdline.read_initramfs_config")
- @mock.patch(DS_PATH + "._is_iscsi_root", return_value=True)
- def test_network_fallback(self, m_is_iscsi_root, m_initramfs_config,
- _m_add_network_config_from_opc_imds):
- """test that fallback network is generated if no kernel cmdline."""
- distro = mock.MagicMock()
- ds, _ = self._get_ds(distro=distro, patches={
- '_is_platform_viable': {'return_value': True},
- 'crawl_metadata': {
- 'return_value': {
- MD_VER: {'system_uuid': self.my_uuid,
- 'meta_data': self.my_md}}}})
- ncfg = {'version': 1, 'config': [{'a': 'b'}]}
- m_initramfs_config.return_value = None
- self.assertTrue(ds._get_data())
- ncfg = {'version': 1, 'config': [{'distro1': 'value'}]}
- distro.generate_fallback_config.return_value = ncfg
- self.assertEqual(ncfg, ds.network_config)
- self.assertEqual([mock.call()], m_initramfs_config.call_args_list)
- distro.generate_fallback_config.assert_called_once_with()
-
- # test that the result got cached, and the methods not re-called.
- self.assertEqual(ncfg, ds.network_config)
- self.assertEqual(1, m_initramfs_config.call_count)
-
- @mock.patch(DS_PATH + "._add_network_config_from_opc_imds")
- @mock.patch(DS_PATH + ".cmdline.read_initramfs_config",
- return_value={'some': 'config'})
- @mock.patch(DS_PATH + "._is_iscsi_root", return_value=True)
- def test_secondary_nics_added_to_network_config_if_enabled(
- self, _m_is_iscsi_root, _m_initramfs_config,
- m_add_network_config_from_opc_imds):
-
- needle = object()
-
- def network_config_side_effect(network_config):
- network_config['secondary_added'] = needle
-
- m_add_network_config_from_opc_imds.side_effect = (
- network_config_side_effect)
-
- distro = mock.MagicMock()
- ds, _ = self._get_ds(distro=distro, patches={
- '_is_platform_viable': {'return_value': True},
- 'crawl_metadata': {
- 'return_value': {
- MD_VER: {'system_uuid': self.my_uuid,
- 'meta_data': self.my_md}}}})
- ds.ds_cfg['configure_secondary_nics'] = True
- self.assertEqual(needle, ds.network_config['secondary_added'])
-
- @mock.patch(DS_PATH + "._add_network_config_from_opc_imds")
- @mock.patch(DS_PATH + ".cmdline.read_initramfs_config",
- return_value={'some': 'config'})
- @mock.patch(DS_PATH + "._is_iscsi_root", return_value=True)
- def test_secondary_nics_not_added_to_network_config_by_default(
- self, _m_is_iscsi_root, _m_initramfs_config,
- m_add_network_config_from_opc_imds):
-
- def network_config_side_effect(network_config):
- network_config['secondary_added'] = True
-
- m_add_network_config_from_opc_imds.side_effect = (
- network_config_side_effect)
-
- distro = mock.MagicMock()
- ds, _ = self._get_ds(distro=distro, patches={
- '_is_platform_viable': {'return_value': True},
- 'crawl_metadata': {
- 'return_value': {
- MD_VER: {'system_uuid': self.my_uuid,
- 'meta_data': self.my_md}}}})
- self.assertNotIn('secondary_added', ds.network_config)
-
- @mock.patch(DS_PATH + "._add_network_config_from_opc_imds")
- @mock.patch(DS_PATH + ".cmdline.read_initramfs_config")
- @mock.patch(DS_PATH + "._is_iscsi_root", return_value=True)
- def test_secondary_nic_failure_isnt_blocking(
- self, _m_is_iscsi_root, m_initramfs_config,
- m_add_network_config_from_opc_imds):
-
- m_add_network_config_from_opc_imds.side_effect = Exception()
-
- distro = mock.MagicMock()
- ds, _ = self._get_ds(distro=distro, patches={
- '_is_platform_viable': {'return_value': True},
- 'crawl_metadata': {
- 'return_value': {
- MD_VER: {'system_uuid': self.my_uuid,
- 'meta_data': self.my_md}}}})
- ds.ds_cfg['configure_secondary_nics'] = True
- self.assertEqual(ds.network_config, m_initramfs_config.return_value)
- self.assertIn('Failed to fetch secondary network configuration',
- self.logs.getvalue())
-
- def test_ds_network_cfg_preferred_over_initramfs(self):
- """Ensure that DS net config is preferred over initramfs config"""
- network_config_sources = oracle.DataSourceOracle.network_config_sources
- self.assertLess(
- network_config_sources.index(NetworkConfigSource.ds),
- network_config_sources.index(NetworkConfigSource.initramfs)
+# Fetched with `curl http://169.254.169.254/opc/v1/instance/` (and then
+# truncated for line length)
+OPC_V1_METADATA = """\
+{
+ "availabilityDomain" : "qIZq:PHX-AD-1",
+ "faultDomain" : "FAULT-DOMAIN-2",
+ "compartmentId" : "ocid1.tenancy.oc1..aaaaaaaao7f7cccogqrg5emjxkxmTRUNCATED",
+ "displayName" : "instance-20200320-1400",
+ "hostname" : "instance-20200320-1400",
+ "id" : "ocid1.instance.oc1.phx.anyhqljtniwq6syc3nex55sep5w34qbwmw6TRUNCATED",
+ "image" : "ocid1.image.oc1.phx.aaaaaaaagmkn4gdhvvx24kiahh2b2qchsicTRUNCATED",
+ "metadata" : {
+ "ssh_authorized_keys" : "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQ truncated",
+ "user_data" : "IyEvYmluL3NoCnRvdWNoIC90bXAvZm9v"
+ },
+ "region" : "phx",
+ "canonicalRegionName" : "us-phoenix-1",
+ "ociAdName" : "phx-ad-3",
+ "shape" : "VM.Standard2.1",
+ "state" : "Running",
+ "timeCreated" : 1584727285318,
+ "agentConfig" : {
+ "monitoringDisabled" : true,
+ "managementDisabled" : true
+ }
+}"""
+
+
+@pytest.yield_fixture
+def oracle_ds(request, fixture_utils, paths):
+ """
+ Return an instantiated DataSourceOracle.
+
+ This also performs the mocking required for the default test case:
+ * ``_read_system_uuid`` returns something,
+ * ``_is_platform_viable`` returns True,
+ * ``_is_iscsi_root`` returns True (the simpler code path),
+ * ``read_opc_metadata`` returns ``OPC_V1_METADATA``
+
+ (This uses the paths fixture for the required helpers.Paths object, and the
+ fixture_utils fixture for fetching markers.)
+ """
+ sys_cfg = fixture_utils.closest_marker_first_arg_or(
+ request, "ds_sys_cfg", mock.MagicMock()
+ )
+ with mock.patch(DS_PATH + "._read_system_uuid", return_value="someuuid"):
+ with mock.patch(DS_PATH + "._is_platform_viable", return_value=True):
+ with mock.patch(DS_PATH + "._is_iscsi_root", return_value=True):
+ with mock.patch(
+ DS_PATH + ".read_opc_metadata",
+ return_value=json.loads(OPC_V1_METADATA),
+ ):
+ yield oracle.DataSourceOracle(
+ sys_cfg=sys_cfg, distro=mock.Mock(), paths=paths,
+ )
+
+
+class TestDataSourceOracle:
+ def test_platform_info(self, oracle_ds):
+ assert "oracle" == oracle_ds.cloud_name
+ assert "oracle" == oracle_ds.platform_type
+ assert (
+ "metadata (http://169.254.169.254/opc/v1/)"
+ == oracle_ds.subplatform
)
+ def test_secondary_nics_disabled_by_default(self, oracle_ds):
+ assert not oracle_ds.ds_cfg["configure_secondary_nics"]
-@mock.patch(DS_PATH + "._read_system_uuid", return_value=str(uuid.uuid4()))
-class TestReadMetaData(test_helpers.HttprettyTestCase):
- """Test the read_metadata which interacts with http metadata service."""
-
- mdurl = oracle.METADATA_ENDPOINT
- my_md = {"uuid": "ocid1.instance.oc1.phx.abyhqlj",
- "name": "ci-vm1", "availability_zone": "phx-ad-3",
- "hostname": "ci-vm1hostname",
- "launch_index": 0, "files": [],
- "public_keys": {"0": "ssh-rsa AAAAB3N...== user@host"},
- "meta": {}}
-
- def populate_md(self, data):
- """call httppretty.register_url for each item dict 'data',
- including valid indexes. Text values converted to bytes."""
- httpretty.register_uri(
- httpretty.GET, self.mdurl + MD_VER + "/",
- '\n'.join(data.keys()).encode('utf-8'))
- for k, v in data.items():
- httpretty.register_uri(
- httpretty.GET, self.mdurl + MD_VER + "/" + k,
- v if not isinstance(v, str) else v.encode('utf-8'))
-
- def test_broken_no_sys_uuid(self, m_read_system_uuid):
- """Datasource requires ability to read system_uuid and true return."""
- m_read_system_uuid.return_value = None
- self.assertRaises(BrokenMetadata, oracle.read_metadata)
-
- def test_broken_no_metadata_json(self, m_read_system_uuid):
- """Datasource requires meta_data.json."""
- httpretty.register_uri(
- httpretty.GET, self.mdurl + MD_VER + "/",
- '\n'.join(['user_data']).encode('utf-8'))
- with self.assertRaises(BrokenMetadata) as cm:
- oracle.read_metadata()
- self.assertIn("Required field 'meta_data.json' missing",
- str(cm.exception))
-
- def test_with_userdata(self, m_read_system_uuid):
- data = {'user_data': b'#!/bin/sh\necho hi world\n',
- 'meta_data.json': json.dumps(self.my_md)}
- self.populate_md(data)
- result = oracle.read_metadata()[MD_VER]
- self.assertEqual(data['user_data'], result['user_data'])
- self.assertEqual(self.my_md, result['meta_data'])
-
- def test_without_userdata(self, m_read_system_uuid):
- data = {'meta_data.json': json.dumps(self.my_md)}
- self.populate_md(data)
- result = oracle.read_metadata()[MD_VER]
- self.assertNotIn('user_data', result)
- self.assertEqual(self.my_md, result['meta_data'])
-
- def test_unknown_fields_included(self, m_read_system_uuid):
- """Unknown fields listed in index should be included.
- And those ending in .json should be decoded."""
- some_data = {'key1': 'data1', 'subk1': {'subd1': 'subv'}}
- some_vendor_data = {'cloud-init': 'foo'}
- data = {'meta_data.json': json.dumps(self.my_md),
- 'some_data.json': json.dumps(some_data),
- 'vendor_data.json': json.dumps(some_vendor_data),
- 'other_blob': b'this is blob'}
- self.populate_md(data)
- result = oracle.read_metadata()[MD_VER]
- self.assertNotIn('user_data', result)
- self.assertEqual(self.my_md, result['meta_data'])
- self.assertEqual(some_data, result['some_data'])
- self.assertEqual(some_vendor_data, result['vendor_data'])
- self.assertEqual(data['other_blob'], result['other_blob'])
+ @pytest.mark.ds_sys_cfg(
+ {"datasource": {"Oracle": {"configure_secondary_nics": True}}}
+ )
+ def test_sys_cfg_can_enable_configure_secondary_nics(self, oracle_ds):
+ assert oracle_ds.ds_cfg["configure_secondary_nics"]
class TestIsPlatformViable(test_helpers.CiTestCase):
@@ -407,73 +153,6 @@ class TestIsPlatformViable(test_helpers.CiTestCase):
m_read_dmi_data.assert_has_calls([mock.call('chassis-asset-tag')])
-class TestLoadIndex(test_helpers.CiTestCase):
- """_load_index handles parsing of an index into a proper list.
- The tests here guarantee correct parsing of html version or
- a fixed version. See the function docstring for more doc."""
-
- _known_html_api_versions = dedent("""\
- <html>
- <head><title>Index of /openstack/</title></head>
- <body bgcolor="white">
- <h1>Index of /openstack/</h1><hr><pre><a href="../">../</a>
- <a href="2013-10-17/">2013-10-17/</a> 27-Jun-2018 12:22 -
- <a href="latest/">latest/</a> 27-Jun-2018 12:22 -
- </pre><hr></body>
- </html>""")
-
- _known_html_contents = dedent("""\
- <html>
- <head><title>Index of /openstack/2013-10-17/</title></head>
- <body bgcolor="white">
- <h1>Index of /openstack/2013-10-17/</h1><hr><pre><a href="../">../</a>
- <a href="meta_data.json">meta_data.json</a> 27-Jun-2018 12:22 679
- <a href="user_data">user_data</a> 27-Jun-2018 12:22 146
- </pre><hr></body>
- </html>""")
-
- def test_parse_html(self):
- """Test parsing of lower case html."""
- self.assertEqual(
- ['2013-10-17/', 'latest/'],
- oracle._load_index(self._known_html_api_versions))
- self.assertEqual(
- ['meta_data.json', 'user_data'],
- oracle._load_index(self._known_html_contents))
-
- def test_parse_html_upper(self):
- """Test parsing of upper case html, although known content is lower."""
- def _toupper(data):
- return data.replace("<a", "<A").replace("html>", "HTML>")
-
- self.assertEqual(
- ['2013-10-17/', 'latest/'],
- oracle._load_index(_toupper(self._known_html_api_versions)))
- self.assertEqual(
- ['meta_data.json', 'user_data'],
- oracle._load_index(_toupper(self._known_html_contents)))
-
- def test_parse_newline_list_with_endl(self):
- """Test parsing of newline separated list with ending newline."""
- self.assertEqual(
- ['2013-10-17/', 'latest/'],
- oracle._load_index("\n".join(["2013-10-17/", "latest/", ""])))
- self.assertEqual(
- ['meta_data.json', 'user_data'],
- oracle._load_index("\n".join(["meta_data.json", "user_data", ""])))
-
- def test_parse_newline_list_without_endl(self):
- """Test parsing of newline separated list with no ending newline.
-
- Actual openstack implementation does not include trailing newline."""
- self.assertEqual(
- ['2013-10-17/', 'latest/'],
- oracle._load_index("\n".join(["2013-10-17/", "latest/"])))
- self.assertEqual(
- ['meta_data.json', 'user_data'],
- oracle._load_index("\n".join(["meta_data.json", "user_data"])))
-
-
class TestNetworkConfigFromOpcImds(test_helpers.CiTestCase):
with_logs = True
@@ -733,4 +412,309 @@ class TestNetworkConfigFiltersNetFailover(test_helpers.CiTestCase):
self.assertEqual(expected_cfg, netcfg)
+class TestReadOpcMetadata:
+ # See https://docs.pytest.org/en/stable/example
+ # /parametrize.html#parametrizing-conditional-raising
+ does_not_raise = ExitStack
+
+ @pytest.fixture(autouse=True)
+ def configure_opc_metadata_in_httpretty(self, httpretty):
+ """Configure HTTPretty with the various OPC metadata endpoints."""
+ httpretty.register_uri(
+ httpretty.GET,
+ "http://169.254.169.254/opc/v1/instance/",
+ OPC_V1_METADATA,
+ )
+
+ def test_json_decoded_value_returned(self):
+ # read_opc_metadata should JSON decode the response and return it
+ expected = json.loads(OPC_V1_METADATA)
+ assert expected == oracle.read_opc_metadata()
+
+ # No need to actually wait between retries in the tests
+ @mock.patch("cloudinit.url_helper.time.sleep", lambda _: None)
+ @pytest.mark.parametrize(
+ "failure_count,expectation",
+ [(1, does_not_raise()), (2, pytest.raises(UrlError))],
+ )
+ def test_retries(self, expectation, failure_count, httpretty):
+ responses = [httpretty.Response("", status=404)] * failure_count
+ responses.append(httpretty.Response(OPC_V1_METADATA))
+ httpretty.register_uri(
+ httpretty.GET,
+ "http://169.254.169.254/opc/v1/instance/",
+ responses=responses,
+ )
+ expected = json.loads(OPC_V1_METADATA)
+ with expectation:
+ assert expected == oracle.read_opc_metadata()
+
+
+class TestCommon_GetDataBehaviour:
+ """This test class tests behaviour common to iSCSI and non-iSCSI root.
+
+ It defines a fixture, parameterized_oracle_ds, which is used in all the
+ tests herein to test that the commonly expected behaviour is the same with
+ iSCSI root and without.
+
+ (As non-iSCSI root behaviour is a superset of iSCSI root behaviour this
+ class is implicitly also testing all iSCSI root behaviour so there is no
+ separate class for that case.)
+ """
+
+ @pytest.yield_fixture(params=[True, False])
+ def parameterized_oracle_ds(self, request, oracle_ds):
+ """oracle_ds parameterized for iSCSI and non-iSCSI root respectively"""
+ is_iscsi_root = request.param
+ with ExitStack() as stack:
+ stack.enter_context(
+ mock.patch(
+ DS_PATH + "._is_iscsi_root", return_value=is_iscsi_root
+ )
+ )
+ if not is_iscsi_root:
+ stack.enter_context(
+ mock.patch(DS_PATH + ".net.find_fallback_nic")
+ )
+ stack.enter_context(
+ mock.patch(DS_PATH + ".dhcp.EphemeralDHCPv4")
+ )
+ yield oracle_ds
+
+ @mock.patch(
+ DS_PATH + "._is_platform_viable", mock.Mock(return_value=False)
+ )
+ def test_false_if_platform_not_viable(
+ self, parameterized_oracle_ds,
+ ):
+ assert not parameterized_oracle_ds._get_data()
+
+ @pytest.mark.parametrize(
+ "keyname,expected_value",
+ (
+ ("availability-zone", "phx-ad-3"),
+ ("launch-index", 0),
+ ("local-hostname", "instance-20200320-1400"),
+ (
+ "instance-id",
+ "ocid1.instance.oc1.phx"
+ ".anyhqljtniwq6syc3nex55sep5w34qbwmw6TRUNCATED",
+ ),
+ ("name", "instance-20200320-1400"),
+ (
+ "public_keys",
+ "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQ truncated",
+ ),
+ ),
+ )
+ def test_metadata_keys_set_correctly(
+ self, keyname, expected_value, parameterized_oracle_ds,
+ ):
+ assert parameterized_oracle_ds._get_data()
+ assert expected_value == parameterized_oracle_ds.metadata[keyname]
+
+ @pytest.mark.parametrize(
+ "attribute_name,expected_value",
+ [
+ ("_crawled_metadata", json.loads(OPC_V1_METADATA)),
+ (
+ "userdata_raw",
+ base64.b64decode(b"IyEvYmluL3NoCnRvdWNoIC90bXAvZm9v"),
+ ),
+ ("system_uuid", "my-test-uuid"),
+ ],
+ )
+ @mock.patch(
+ DS_PATH + "._read_system_uuid", mock.Mock(return_value="my-test-uuid")
+ )
+ def test_attributes_set_correctly(
+ self, attribute_name, expected_value, parameterized_oracle_ds,
+ ):
+ assert parameterized_oracle_ds._get_data()
+ assert expected_value == getattr(
+ parameterized_oracle_ds, attribute_name
+ )
+
+ @pytest.mark.parametrize(
+ "ssh_keys,expected_value",
+ [
+ # No SSH keys in metadata => no keys detected
+ (None, []),
+ # Empty SSH keys in metadata => no keys detected
+ ("", []),
+ # Single SSH key in metadata => single key detected
+ ("ssh-rsa ... test@test", ["ssh-rsa ... test@test"]),
+ # Multiple SSH keys in metadata => multiple keys detected
+ (
+ "ssh-rsa ... test@test\nssh-rsa ... test2@test2",
+ ["ssh-rsa ... test@test", "ssh-rsa ... test2@test2"],
+ ),
+ ],
+ )
+ def test_public_keys_handled_correctly(
+ self, ssh_keys, expected_value, parameterized_oracle_ds
+ ):
+ metadata = json.loads(OPC_V1_METADATA)
+ if ssh_keys is None:
+ del metadata["metadata"]["ssh_authorized_keys"]
+ else:
+ metadata["metadata"]["ssh_authorized_keys"] = ssh_keys
+ with mock.patch(
+ DS_PATH + ".read_opc_metadata", mock.Mock(return_value=metadata),
+ ):
+ assert parameterized_oracle_ds._get_data()
+ assert (
+ expected_value == parameterized_oracle_ds.get_public_ssh_keys()
+ )
+
+ def test_missing_user_data_handled_gracefully(
+ self, parameterized_oracle_ds
+ ):
+ metadata = json.loads(OPC_V1_METADATA)
+ del metadata["metadata"]["user_data"]
+ with mock.patch(
+ DS_PATH + ".read_opc_metadata", mock.Mock(return_value=metadata),
+ ):
+ assert parameterized_oracle_ds._get_data()
+
+ assert parameterized_oracle_ds.userdata_raw is None
+
+ def test_missing_metadata_handled_gracefully(
+ self, parameterized_oracle_ds
+ ):
+ metadata = json.loads(OPC_V1_METADATA)
+ del metadata["metadata"]
+ with mock.patch(
+ DS_PATH + ".read_opc_metadata", mock.Mock(return_value=metadata),
+ ):
+ assert parameterized_oracle_ds._get_data()
+
+ assert parameterized_oracle_ds.userdata_raw is None
+ assert [] == parameterized_oracle_ds.get_public_ssh_keys()
+
+
+@mock.patch(DS_PATH + "._is_iscsi_root", lambda: False)
+class TestNonIscsiRoot_GetDataBehaviour:
+ @mock.patch(DS_PATH + ".dhcp.EphemeralDHCPv4")
+ @mock.patch(DS_PATH + ".net.find_fallback_nic")
+ def test_read_opc_metadata_called_with_ephemeral_dhcp(
+ self, m_find_fallback_nic, m_EphemeralDHCPv4, oracle_ds
+ ):
+ in_context_manager = False
+
+ def enter_context_manager():
+ nonlocal in_context_manager
+ in_context_manager = True
+
+ def exit_context_manager(*args):
+ nonlocal in_context_manager
+ in_context_manager = False
+
+ m_EphemeralDHCPv4.return_value.__enter__.side_effect = (
+ enter_context_manager
+ )
+ m_EphemeralDHCPv4.return_value.__exit__.side_effect = (
+ exit_context_manager
+ )
+
+ def assert_in_context_manager():
+ assert in_context_manager
+ return mock.MagicMock()
+
+ with mock.patch(
+ DS_PATH + ".read_opc_metadata",
+ mock.Mock(side_effect=assert_in_context_manager),
+ ):
+ assert oracle_ds._get_data()
+
+ assert [
+ mock.call(m_find_fallback_nic.return_value)
+ ] == m_EphemeralDHCPv4.call_args_list
+
+
+@mock.patch(DS_PATH + ".get_interfaces_by_mac", lambda: {})
+@mock.patch(DS_PATH + ".cmdline.read_initramfs_config")
+class TestNetworkConfig:
+ def test_network_config_cached(self, m_read_initramfs_config, oracle_ds):
+ """.network_config should be cached"""
+ assert 0 == m_read_initramfs_config.call_count
+ oracle_ds.network_config # pylint: disable=pointless-statement
+ assert 1 == m_read_initramfs_config.call_count
+ oracle_ds.network_config # pylint: disable=pointless-statement
+ assert 1 == m_read_initramfs_config.call_count
+
+ def test_network_cmdline(self, m_read_initramfs_config, oracle_ds):
+ """network_config should prefer initramfs config over fallback"""
+ ncfg = {"version": 1, "config": [{"a": "b"}]}
+ m_read_initramfs_config.return_value = copy.deepcopy(ncfg)
+
+ assert ncfg == oracle_ds.network_config
+ assert 0 == oracle_ds.distro.generate_fallback_config.call_count
+
+ def test_network_fallback(self, m_read_initramfs_config, oracle_ds):
+ """network_config should prefer initramfs config over fallback"""
+ ncfg = {"version": 1, "config": [{"a": "b"}]}
+
+ m_read_initramfs_config.return_value = None
+ oracle_ds.distro.generate_fallback_config.return_value = copy.deepcopy(
+ ncfg
+ )
+
+ assert ncfg == oracle_ds.network_config
+
+ @pytest.mark.parametrize(
+ "configure_secondary_nics,expect_secondary_nics",
+ [(True, True), (False, False), (None, False)],
+ )
+ @mock.patch(DS_PATH + "._add_network_config_from_opc_imds")
+ def test_secondary_nic_addition(
+ self,
+ m_add_network_config_from_opc_imds,
+ m_read_initramfs_config,
+ configure_secondary_nics,
+ expect_secondary_nics,
+ oracle_ds,
+ ):
+ """Test that _add_network_config_from_opc_imds is called as expected
+
+ (configure_secondary_nics=None is used to test the default behaviour.)
+ """
+ m_read_initramfs_config.return_value = {"version": 1, "config": []}
+
+ def side_effect(network_config):
+ network_config["secondary_added"] = mock.sentinel.needle
+
+ m_add_network_config_from_opc_imds.side_effect = side_effect
+
+ if configure_secondary_nics is not None:
+ oracle_ds.ds_cfg[
+ "configure_secondary_nics"
+ ] = configure_secondary_nics
+
+ was_secondary_added = "secondary_added" in oracle_ds.network_config
+ assert expect_secondary_nics == was_secondary_added
+
+ @mock.patch(DS_PATH + "._add_network_config_from_opc_imds")
+ def test_secondary_nic_failure_isnt_blocking(
+ self,
+ m_add_network_config_from_opc_imds,
+ m_read_initramfs_config,
+ caplog,
+ oracle_ds,
+ ):
+ m_add_network_config_from_opc_imds.side_effect = Exception()
+
+ oracle_ds.ds_cfg["configure_secondary_nics"] = True
+
+ assert m_read_initramfs_config.return_value == oracle_ds.network_config
+ assert "Failed to fetch secondary network configuration" in caplog.text
+
+ def test_ds_network_cfg_preferred_over_initramfs(self, _m):
+ """Ensure that DS net config is preferred over initramfs config"""
+ config_sources = oracle.DataSourceOracle.network_config_sources
+ ds_idx = config_sources.index(NetworkConfigSource.ds)
+ initramfs_idx = config_sources.index(NetworkConfigSource.initramfs)
+ assert ds_idx < initramfs_idx
+
+
# vi: ts=4 expandtab
diff --git a/conftest.py b/conftest.py
index faf13804..76e9000a 100644
--- a/conftest.py
+++ b/conftest.py
@@ -1,24 +1,64 @@
+import os
from unittest import mock
import pytest
+import httpretty as _httpretty
-from cloudinit import subp
+from cloudinit import helpers, subp
-def _closest_marker_args_or(request, marker_name: str, default):
- """Get the args for the closest ``marker_name`` or return ``default``"""
- try:
- marker = request.node.get_closest_marker(marker_name)
- except AttributeError:
- # Older versions of pytest don't have the new API
- marker = request.node.get_marker(marker_name)
- if marker is not None:
- return marker.args
- return default
+class _FixtureUtils:
+ """A namespace for fixture helper functions, used by fixture_utils.
+
+ These helper functions are all defined as staticmethods so they are
+ effectively functions; they are defined in a class only to give us a
+ namespace so calling them can look like
+ ``fixture_utils.fixture_util_function()`` in test code.
+ """
+
+ @staticmethod
+ def closest_marker_args_or(request, marker_name: str, default):
+ """Get the args for closest ``marker_name`` or return ``default``
+
+ :param request:
+ A pytest request, as passed to a fixture.
+ :param marker_name:
+ The name of the marker to look for
+ :param default:
+ The value to return if ``marker_name`` is not found.
+
+ :return:
+ The args for the closest ``marker_name`` marker, or ``default``
+ if no such marker is found.
+ """
+ try:
+ marker = request.node.get_closest_marker(marker_name)
+ except AttributeError:
+ # Older versions of pytest don't have the new API
+ marker = request.node.get_marker(marker_name)
+ if marker is not None:
+ return marker.args
+ return default
+
+ @staticmethod
+ def closest_marker_first_arg_or(request, marker_name: str, default):
+ """Get the first arg for closest ``marker_name`` or return ``default``
+
+ This is a convenience wrapper around closest_marker_args_or, see there
+ for full details.
+ """
+ result = _FixtureUtils.closest_marker_args_or(
+ request, marker_name, [default]
+ )
+ if not result:
+ raise TypeError(
+ "Missing expected argument to {} marker".format(marker_name)
+ )
+ return result[0]
@pytest.yield_fixture(autouse=True)
-def disable_subp_usage(request):
+def disable_subp_usage(request, fixture_utils):
"""
Across all (pytest) tests, ensure that subp.subp is not invoked.
@@ -53,10 +93,14 @@ def disable_subp_usage(request):
tests, CiTestCase's allowed_subp does take precedence (and we have
TestDisableSubpUsageInTestSubclass to confirm that).
"""
- allow_subp_for = _closest_marker_args_or(request, "allow_subp_for", None)
+ allow_subp_for = fixture_utils.closest_marker_args_or(
+ request, "allow_subp_for", None
+ )
# Because the mark doesn't take arguments, `allow_all_subp` will be set to
# [] if the marker is present, so explicit None checks are required
- allow_all_subp = _closest_marker_args_or(request, "allow_all_subp", None)
+ allow_all_subp = fixture_utils.closest_marker_args_or(
+ request, "allow_all_subp", None
+ )
if allow_all_subp is not None and allow_subp_for is None:
# Only allow_all_subp specified, don't mock subp.subp
@@ -93,3 +137,47 @@ def disable_subp_usage(request):
with mock.patch("cloudinit.subp.subp", autospec=True) as m_subp:
m_subp.side_effect = side_effect
yield
+
+
+@pytest.fixture(scope="session")
+def fixture_utils():
+ """Return a namespace containing fixture utility functions.
+
+ See :py:class:`_FixtureUtils` for further details."""
+ return _FixtureUtils
+
+
+@pytest.yield_fixture
+def httpretty():
+ """
+ Enable HTTPretty for duration of the testcase, resetting before and after.
+
+ This will also ensure allow_net_connect is set to False, and temporarily
+ unset http_proxy in os.environ if present (to work around
+ https://github.com/gabrielfalcao/HTTPretty/issues/122).
+ """
+ restore_proxy = os.environ.pop("http_proxy", None)
+ _httpretty.HTTPretty.allow_net_connect = False
+ _httpretty.reset()
+ _httpretty.enable()
+
+ yield _httpretty
+
+ _httpretty.disable()
+ _httpretty.reset()
+ if restore_proxy is not None:
+ os.environ["http_proxy"] = restore_proxy
+
+
+@pytest.fixture
+def paths(tmpdir):
+ """
+ Return a helpers.Paths object configured to use a tmpdir.
+
+ (This uses the builtin tmpdir fixture.)
+ """
+ dirs = {
+ "cloud_dir": tmpdir.mkdir("cloud_dir").strpath,
+ "run_dir": tmpdir.mkdir("run_dir").strpath,
+ }
+ return helpers.Paths(dirs)
diff --git a/tox.ini b/tox.ini
index 3fd96702..6c4b2e81 100644
--- a/tox.ini
+++ b/tox.ini
@@ -140,3 +140,4 @@ addopts = --strict
markers =
allow_subp_for: allow subp usage for the given commands (disable_subp_usage)
allow_all_subp: allow all subp usage (disable_subp_usage)
+ ds_sys_cfg: a sys_cfg dict to be used by datasource fixtures