summaryrefslogtreecommitdiff
path: root/tests/unittests/test_ds_identify.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_ds_identify.py')
-rw-r--r--tests/unittests/test_ds_identify.py243
1 files changed, 225 insertions, 18 deletions
diff --git a/tests/unittests/test_ds_identify.py b/tests/unittests/test_ds_identify.py
index 53643989..46778e95 100644
--- a/tests/unittests/test_ds_identify.py
+++ b/tests/unittests/test_ds_identify.py
@@ -1,5 +1,6 @@
# This file is part of cloud-init. See LICENSE file for license information.
+from collections import namedtuple
import copy
import os
from uuid import uuid4
@@ -7,9 +8,11 @@ from uuid import uuid4
from cloudinit import safeyaml
from cloudinit import util
from cloudinit.tests.helpers import (
- CiTestCase, dir2dict, populate_dir)
+ CiTestCase, dir2dict, populate_dir, populate_dir_with_ts)
-from cloudinit.sources import DataSourceIBMCloud as dsibm
+from cloudinit.sources import DataSourceIBMCloud as ds_ibm
+from cloudinit.sources import DataSourceSmartOS as ds_smartos
+from cloudinit.sources import DataSourceOracle as ds_oracle
UNAME_MYSYS = ("Linux bart 4.4.0-62-generic #83-Ubuntu "
"SMP Wed Jan 18 14:10:15 UTC 2017 x86_64 GNU/Linux")
@@ -66,19 +69,29 @@ P_SYS_VENDOR = "sys/class/dmi/id/sys_vendor"
P_SEED_DIR = "var/lib/cloud/seed"
P_DSID_CFG = "etc/cloud/ds-identify.cfg"
-IBM_PROVISIONING_CHECK_PATH = "/root/provisioningConfiguration.cfg"
IBM_CONFIG_UUID = "9796-932E"
+MOCK_VIRT_IS_CONTAINER_OTHER = {'name': 'detect_virt',
+ 'RET': 'container-other', 'ret': 0}
MOCK_VIRT_IS_KVM = {'name': 'detect_virt', 'RET': 'kvm', 'ret': 0}
MOCK_VIRT_IS_VMWARE = {'name': 'detect_virt', 'RET': 'vmware', 'ret': 0}
+# currenty' SmartOS hypervisor "bhyve" is unknown by systemd-detect-virt.
+MOCK_VIRT_IS_VM_OTHER = {'name': 'detect_virt', 'RET': 'vm-other', 'ret': 0}
MOCK_VIRT_IS_XEN = {'name': 'detect_virt', 'RET': 'xen', 'ret': 0}
MOCK_UNAME_IS_PPC64 = {'name': 'uname', 'out': UNAME_PPC64EL, 'ret': 0}
+shell_true = 0
+shell_false = 1
-class TestDsIdentify(CiTestCase):
+CallReturn = namedtuple('CallReturn',
+ ['rc', 'stdout', 'stderr', 'cfg', 'files'])
+
+
+class DsIdentifyBase(CiTestCase):
dsid_path = os.path.realpath('tools/ds-identify')
+ allowed_subp = ['sh']
- def call(self, rootd=None, mocks=None, args=None, files=None,
+ def call(self, rootd=None, mocks=None, func="main", args=None, files=None,
policy_dmi=DI_DEFAULT_POLICY,
policy_no_dmi=DI_DEFAULT_POLICY_NO_DMI,
ec2_strict_id=DI_EC2_STRICT_ID_DEFAULT):
@@ -135,7 +148,7 @@ class TestDsIdentify(CiTestCase):
mocklines.append(write_mock(d))
endlines = [
- 'main %s' % ' '.join(['"%s"' % s for s in args])
+ func + ' ' + ' '.join(['"%s"' % s for s in args])
]
with open(wrap, "w") as fp:
@@ -159,12 +172,14 @@ class TestDsIdentify(CiTestCase):
cfg = {"_INVALID_YAML": contents,
"_EXCEPTION": str(e)}
- return rc, out, err, cfg, dir2dict(rootd)
+ return CallReturn(rc, out, err, cfg, dir2dict(rootd))
def _call_via_dict(self, data, rootd=None, **kwargs):
# return output of self.call with a dict input like VALID_CFG[item]
xwargs = {'rootd': rootd}
- for k in ('mocks', 'args', 'policy_dmi', 'policy_no_dmi', 'files'):
+ passthrough = ('mocks', 'func', 'args', 'policy_dmi',
+ 'policy_no_dmi', 'files')
+ for k in passthrough:
if k in data:
xwargs[k] = data[k]
if k in kwargs:
@@ -178,18 +193,21 @@ class TestDsIdentify(CiTestCase):
data, RC_FOUND, dslist=[data.get('ds'), DS_NONE])
def _check_via_dict(self, data, rc, dslist=None, **kwargs):
- found_rc, out, err, cfg, files = self._call_via_dict(data, **kwargs)
+ ret = self._call_via_dict(data, **kwargs)
good = False
try:
- self.assertEqual(rc, found_rc)
+ self.assertEqual(rc, ret.rc)
if dslist is not None:
- self.assertEqual(dslist, cfg['datasource_list'])
+ self.assertEqual(dslist, ret.cfg['datasource_list'])
good = True
finally:
if not good:
- _print_run_output(rc, out, err, cfg, files)
- return rc, out, err, cfg, files
+ _print_run_output(ret.rc, ret.stdout, ret.stderr, ret.cfg,
+ ret.files)
+ return ret
+
+class TestDsIdentify(DsIdentifyBase):
def test_wb_print_variables(self):
"""_print_info reports an array of discovered variables to stderr."""
data = VALID_CFG['Azure-dmi-detection']
@@ -237,20 +255,50 @@ class TestDsIdentify(CiTestCase):
def test_config_drive(self):
"""ConfigDrive datasource has a disk with LABEL=config-2."""
self._test_ds_found('ConfigDrive')
- return
def test_config_drive_upper(self):
"""ConfigDrive datasource has a disk with LABEL=CONFIG-2."""
self._test_ds_found('ConfigDriveUpper')
return
+ def test_config_drive_seed(self):
+ """Config Drive seed directory."""
+ self._test_ds_found('ConfigDrive-seed')
+
+ def test_config_drive_interacts_with_ibmcloud_config_disk(self):
+ """Verify ConfigDrive interaction with IBMCloud.
+
+ If ConfigDrive is enabled and not IBMCloud, then ConfigDrive
+ should claim the ibmcloud 'config-2' disk.
+ If IBMCloud is enabled, then ConfigDrive should skip."""
+ data = copy.deepcopy(VALID_CFG['IBMCloud-config-2'])
+ files = data.get('files', {})
+ if not files:
+ data['files'] = files
+ cfgpath = 'etc/cloud/cloud.cfg.d/99_networklayer_common.cfg'
+
+ # with list including IBMCloud, config drive should be not found.
+ files[cfgpath] = 'datasource_list: [ ConfigDrive, IBMCloud ]\n'
+ ret = self._check_via_dict(data, shell_true)
+ self.assertEqual(
+ ret.cfg.get('datasource_list'), ['IBMCloud', 'None'])
+
+ # But if IBMCloud is not enabled, config drive should claim this.
+ files[cfgpath] = 'datasource_list: [ ConfigDrive, NoCloud ]\n'
+ ret = self._check_via_dict(data, shell_true)
+ self.assertEqual(
+ ret.cfg.get('datasource_list'), ['ConfigDrive', 'None'])
+
def test_ibmcloud_template_userdata_in_provisioning(self):
"""Template provisioned with user-data during provisioning stage.
Template provisioning with user-data has METADATA disk,
datasource should return not found."""
data = copy.deepcopy(VALID_CFG['IBMCloud-metadata'])
- data['files'] = {IBM_PROVISIONING_CHECK_PATH: 'xxx'}
+ # change the 'is_ibm_provisioning' mock to return 1 (false)
+ isprov_m = [m for m in data['mocks']
+ if m["name"] == "is_ibm_provisioning"][0]
+ isprov_m['ret'] = shell_true
return self._check_via_dict(data, RC_NOT_FOUND)
def test_ibmcloud_template_userdata(self):
@@ -265,7 +313,8 @@ class TestDsIdentify(CiTestCase):
no disks attached. Datasource should return not found."""
data = copy.deepcopy(VALID_CFG['IBMCloud-nodisks'])
- data['files'] = {IBM_PROVISIONING_CHECK_PATH: 'xxx'}
+ data['mocks'].append(
+ {'name': 'is_ibm_provisioning', 'ret': shell_true})
return self._check_via_dict(data, RC_NOT_FOUND)
def test_ibmcloud_template_no_userdata(self):
@@ -290,11 +339,42 @@ class TestDsIdentify(CiTestCase):
break
if not offset:
raise ValueError("Expected to find 'blkid' mock, but did not.")
- data['mocks'][offset]['out'] = d['out'].replace(dsibm.IBM_CONFIG_UUID,
+ data['mocks'][offset]['out'] = d['out'].replace(ds_ibm.IBM_CONFIG_UUID,
"DEAD-BEEF")
self._check_via_dict(
data, rc=RC_FOUND, dslist=['ConfigDrive', DS_NONE])
+ def test_ibmcloud_with_nocloud_seed(self):
+ """NoCloud seed should be preferred over IBMCloud.
+
+ A nocloud seed should be preferred over IBMCloud even if enabled.
+ Ubuntu 16.04 images have <vlc>/seed/nocloud-net. LP: #1766401."""
+ data = copy.deepcopy(VALID_CFG['IBMCloud-config-2'])
+ files = data.get('files', {})
+ if not files:
+ data['files'] = files
+ files.update(VALID_CFG['NoCloud-seed']['files'])
+ ret = self._check_via_dict(data, shell_true)
+ self.assertEqual(
+ ['NoCloud', 'IBMCloud', 'None'],
+ ret.cfg.get('datasource_list'))
+
+ def test_ibmcloud_with_configdrive_seed(self):
+ """ConfigDrive seed should be preferred over IBMCloud.
+
+ A ConfigDrive seed should be preferred over IBMCloud even if enabled.
+ Ubuntu 16.04 images have a fstab entry that mounts the
+ METADATA disk into <vlc>/seed/config_drive. LP: ##1766401."""
+ data = copy.deepcopy(VALID_CFG['IBMCloud-config-2'])
+ files = data.get('files', {})
+ if not files:
+ data['files'] = files
+ files.update(VALID_CFG['ConfigDrive-seed']['files'])
+ ret = self._check_via_dict(data, shell_true)
+ self.assertEqual(
+ ['ConfigDrive', 'IBMCloud', 'None'],
+ ret.cfg.get('datasource_list'))
+
def test_policy_disabled(self):
"""A Builtin policy of 'disabled' should return not found.
@@ -445,6 +525,92 @@ class TestDsIdentify(CiTestCase):
"""Hetzner cloud is identified in sys_vendor."""
self._test_ds_found('Hetzner')
+ def test_smartos_bhyve(self):
+ """SmartOS cloud identified by SmartDC in dmi."""
+ self._test_ds_found('SmartOS-bhyve')
+
+ def test_smartos_lxbrand(self):
+ """SmartOS cloud identified on lxbrand container."""
+ self._test_ds_found('SmartOS-lxbrand')
+
+ def test_smartos_lxbrand_requires_socket(self):
+ """SmartOS cloud should not be identified if no socket file."""
+ mycfg = copy.deepcopy(VALID_CFG['SmartOS-lxbrand'])
+ del mycfg['files'][ds_smartos.METADATA_SOCKFILE]
+ self._check_via_dict(mycfg, rc=RC_NOT_FOUND, policy_dmi="disabled")
+
+ def test_path_env_gets_set_from_main(self):
+ """PATH environment should always have some tokens when main is run.
+
+ We explicitly call main as we want to ensure it updates PATH."""
+ cust = copy.deepcopy(VALID_CFG['NoCloud'])
+ rootd = self.tmp_dir()
+ mpp = 'main-printpath'
+ pre = "MYPATH="
+ cust['files'][mpp] = (
+ 'PATH="/mycust/path"; main; r=$?; echo ' + pre + '$PATH; exit $r;')
+ ret = self._check_via_dict(
+ cust, RC_FOUND,
+ func=".", args=[os.path.join(rootd, mpp)], rootd=rootd)
+ line = [l for l in ret.stdout.splitlines() if l.startswith(pre)][0]
+ toks = line.replace(pre, "").split(":")
+ expected = ["/sbin", "/bin", "/usr/sbin", "/usr/bin", "/mycust/path"]
+ self.assertEqual(expected, [p for p in expected if p in toks],
+ "path did not have expected tokens")
+
+
+class TestIsIBMProvisioning(DsIdentifyBase):
+ """Test the is_ibm_provisioning method in ds-identify."""
+
+ inst_log = "/root/swinstall.log"
+ prov_cfg = "/root/provisioningConfiguration.cfg"
+ boot_ref = "/proc/1/environ"
+ funcname = "is_ibm_provisioning"
+
+ def test_no_config(self):
+ """No provisioning config means not provisioning."""
+ ret = self.call(files={}, func=self.funcname)
+ self.assertEqual(shell_false, ret.rc)
+
+ def test_config_only(self):
+ """A provisioning config without a log means provisioning."""
+ ret = self.call(files={self.prov_cfg: "key=value"}, func=self.funcname)
+ self.assertEqual(shell_true, ret.rc)
+
+ def test_config_with_old_log(self):
+ """A config with a log from previous boot is not provisioning."""
+ rootd = self.tmp_dir()
+ data = {self.prov_cfg: ("key=value\nkey2=val2\n", -10),
+ self.inst_log: ("log data\n", -30),
+ self.boot_ref: ("PWD=/", 0)}
+ populate_dir_with_ts(rootd, data)
+ ret = self.call(rootd=rootd, func=self.funcname)
+ self.assertEqual(shell_false, ret.rc)
+ self.assertIn("from previous boot", ret.stderr)
+
+ def test_config_with_new_log(self):
+ """A config with a log from this boot is provisioning."""
+ rootd = self.tmp_dir()
+ data = {self.prov_cfg: ("key=value\nkey2=val2\n", -10),
+ self.inst_log: ("log data\n", 30),
+ self.boot_ref: ("PWD=/", 0)}
+ populate_dir_with_ts(rootd, data)
+ ret = self.call(rootd=rootd, func=self.funcname)
+ self.assertEqual(shell_true, ret.rc)
+ self.assertIn("from current boot", ret.stderr)
+
+
+class TestOracle(DsIdentifyBase):
+ def test_found_by_chassis(self):
+ """Simple positive test of Oracle by chassis id."""
+ self._test_ds_found('Oracle')
+
+ def test_not_found(self):
+ """Simple negative test of Oracle."""
+ mycfg = copy.deepcopy(VALID_CFG['Oracle'])
+ mycfg['files'][P_CHASSIS_ASSET_TAG] = "Not Oracle"
+ self._check_via_dict(mycfg, rc=RC_NOT_FOUND)
+
def blkid_out(disks=None):
"""Convert a list of disk dictionaries into blkid content."""
@@ -631,6 +797,12 @@ VALID_CFG = {
},
],
},
+ 'ConfigDrive-seed': {
+ 'ds': 'ConfigDrive',
+ 'files': {
+ os.path.join(P_SEED_DIR, 'config_drive', 'openstack',
+ 'latest', 'meta_data.json'): 'md\n'},
+ },
'Hetzner': {
'ds': 'Hetzner',
'files': {P_SYS_VENDOR: 'Hetzner\n'},
@@ -639,6 +811,7 @@ VALID_CFG = {
'ds': 'IBMCloud',
'mocks': [
MOCK_VIRT_IS_XEN,
+ {'name': 'is_ibm_provisioning', 'ret': shell_false},
{'name': 'blkid', 'ret': 0,
'out': blkid_out(
[{'DEVNAME': 'xvda1', 'TYPE': 'vfat', 'PARTUUID': uuid4()},
@@ -652,12 +825,13 @@ VALID_CFG = {
'ds': 'IBMCloud',
'mocks': [
MOCK_VIRT_IS_XEN,
+ {'name': 'is_ibm_provisioning', 'ret': shell_false},
{'name': 'blkid', 'ret': 0,
'out': blkid_out(
[{'DEVNAME': 'xvda1', 'TYPE': 'ext3', 'PARTUUID': uuid4(),
'UUID': uuid4(), 'LABEL': 'cloudimg-bootfs'},
{'DEVNAME': 'xvdb', 'TYPE': 'vfat', 'LABEL': 'config-2',
- 'UUID': dsibm.IBM_CONFIG_UUID},
+ 'UUID': ds_ibm.IBM_CONFIG_UUID},
{'DEVNAME': 'xvda2', 'TYPE': 'ext4',
'LABEL': 'cloudimg-rootfs', 'PARTUUID': uuid4(),
'UUID': uuid4()},
@@ -669,6 +843,7 @@ VALID_CFG = {
'ds': 'IBMCloud',
'mocks': [
MOCK_VIRT_IS_XEN,
+ {'name': 'is_ibm_provisioning', 'ret': shell_false},
{'name': 'blkid', 'ret': 0,
'out': blkid_out(
[{'DEVNAME': 'xvda1', 'TYPE': 'vfat', 'PARTUUID': uuid4()},
@@ -677,6 +852,38 @@ VALID_CFG = {
},
],
},
+ 'Oracle': {
+ 'ds': 'Oracle',
+ 'files': {
+ P_CHASSIS_ASSET_TAG: ds_oracle.CHASSIS_ASSET_TAG + '\n',
+ }
+ },
+ 'SmartOS-bhyve': {
+ 'ds': 'SmartOS',
+ 'mocks': [
+ MOCK_VIRT_IS_VM_OTHER,
+ {'name': 'blkid', 'ret': 0,
+ 'out': blkid_out(
+ [{'DEVNAME': 'vda1', 'TYPE': 'ext4',
+ 'PARTUUID': '49ec635a-01'},
+ {'DEVNAME': 'vda2', 'TYPE': 'swap',
+ 'LABEL': 'cloudimg-swap', 'PARTUUID': '49ec635a-02'}]),
+ },
+ ],
+ 'files': {P_PRODUCT_NAME: 'SmartDC HVM\n'},
+ },
+ 'SmartOS-lxbrand': {
+ 'ds': 'SmartOS',
+ 'mocks': [
+ MOCK_VIRT_IS_CONTAINER_OTHER,
+ {'name': 'uname', 'ret': 0,
+ 'out': ("Linux d43da87a-daca-60e8-e6d4-d2ed372662a3 4.3.0 "
+ "BrandZ virtual linux x86_64 GNU/Linux")},
+ {'name': 'blkid', 'ret': 2, 'out': ''},
+ ],
+ 'files': {ds_smartos.METADATA_SOCKFILE: 'would be a socket\n'},
+ }
+
}
# vi: ts=4 expandtab