diff options
Diffstat (limited to 'tests/unittests/test_ds_identify.py')
-rw-r--r-- | tests/unittests/test_ds_identify.py | 72 |
1 files changed, 64 insertions, 8 deletions
diff --git a/tests/unittests/test_ds_identify.py b/tests/unittests/test_ds_identify.py index 53643989..ad7fe41e 100644 --- a/tests/unittests/test_ds_identify.py +++ b/tests/unittests/test_ds_identify.py @@ -1,5 +1,6 @@ # This file is part of cloud-init. See LICENSE file for license information. +from collections import namedtuple import copy import os from uuid import uuid4 @@ -7,7 +8,7 @@ from uuid import uuid4 from cloudinit import safeyaml from cloudinit import util from cloudinit.tests.helpers import ( - CiTestCase, dir2dict, populate_dir) + CiTestCase, dir2dict, populate_dir, populate_dir_with_ts) from cloudinit.sources import DataSourceIBMCloud as dsibm @@ -66,7 +67,6 @@ P_SYS_VENDOR = "sys/class/dmi/id/sys_vendor" P_SEED_DIR = "var/lib/cloud/seed" P_DSID_CFG = "etc/cloud/ds-identify.cfg" -IBM_PROVISIONING_CHECK_PATH = "/root/provisioningConfiguration.cfg" IBM_CONFIG_UUID = "9796-932E" MOCK_VIRT_IS_KVM = {'name': 'detect_virt', 'RET': 'kvm', 'ret': 0} @@ -74,11 +74,17 @@ MOCK_VIRT_IS_VMWARE = {'name': 'detect_virt', 'RET': 'vmware', 'ret': 0} MOCK_VIRT_IS_XEN = {'name': 'detect_virt', 'RET': 'xen', 'ret': 0} MOCK_UNAME_IS_PPC64 = {'name': 'uname', 'out': UNAME_PPC64EL, 'ret': 0} +shell_true = 0 +shell_false = 1 -class TestDsIdentify(CiTestCase): +CallReturn = namedtuple('CallReturn', + ['rc', 'stdout', 'stderr', 'cfg', 'files']) + + +class DsIdentifyBase(CiTestCase): dsid_path = os.path.realpath('tools/ds-identify') - def call(self, rootd=None, mocks=None, args=None, files=None, + def call(self, rootd=None, mocks=None, func="main", args=None, files=None, policy_dmi=DI_DEFAULT_POLICY, policy_no_dmi=DI_DEFAULT_POLICY_NO_DMI, ec2_strict_id=DI_EC2_STRICT_ID_DEFAULT): @@ -135,7 +141,7 @@ class TestDsIdentify(CiTestCase): mocklines.append(write_mock(d)) endlines = [ - 'main %s' % ' '.join(['"%s"' % s for s in args]) + func + ' ' + ' '.join(['"%s"' % s for s in args]) ] with open(wrap, "w") as fp: @@ -159,7 +165,7 @@ class TestDsIdentify(CiTestCase): cfg = {"_INVALID_YAML": contents, "_EXCEPTION": str(e)} - return rc, out, err, cfg, dir2dict(rootd) + return CallReturn(rc, out, err, cfg, dir2dict(rootd)) def _call_via_dict(self, data, rootd=None, **kwargs): # return output of self.call with a dict input like VALID_CFG[item] @@ -190,6 +196,8 @@ class TestDsIdentify(CiTestCase): _print_run_output(rc, out, err, cfg, files) return rc, out, err, cfg, files + +class TestDsIdentify(DsIdentifyBase): def test_wb_print_variables(self): """_print_info reports an array of discovered variables to stderr.""" data = VALID_CFG['Azure-dmi-detection'] @@ -250,7 +258,10 @@ class TestDsIdentify(CiTestCase): Template provisioning with user-data has METADATA disk, datasource should return not found.""" data = copy.deepcopy(VALID_CFG['IBMCloud-metadata']) - data['files'] = {IBM_PROVISIONING_CHECK_PATH: 'xxx'} + # change the 'is_ibm_provisioning' mock to return 1 (false) + isprov_m = [m for m in data['mocks'] + if m["name"] == "is_ibm_provisioning"][0] + isprov_m['ret'] = shell_true return self._check_via_dict(data, RC_NOT_FOUND) def test_ibmcloud_template_userdata(self): @@ -265,7 +276,8 @@ class TestDsIdentify(CiTestCase): no disks attached. Datasource should return not found.""" data = copy.deepcopy(VALID_CFG['IBMCloud-nodisks']) - data['files'] = {IBM_PROVISIONING_CHECK_PATH: 'xxx'} + data['mocks'].append( + {'name': 'is_ibm_provisioning', 'ret': shell_true}) return self._check_via_dict(data, RC_NOT_FOUND) def test_ibmcloud_template_no_userdata(self): @@ -446,6 +458,47 @@ class TestDsIdentify(CiTestCase): self._test_ds_found('Hetzner') +class TestIsIBMProvisioning(DsIdentifyBase): + """Test the is_ibm_provisioning method in ds-identify.""" + + inst_log = "/root/swinstall.log" + prov_cfg = "/root/provisioningConfiguration.cfg" + boot_ref = "/proc/1/environ" + funcname = "is_ibm_provisioning" + + def test_no_config(self): + """No provisioning config means not provisioning.""" + ret = self.call(files={}, func=self.funcname) + self.assertEqual(shell_false, ret.rc) + + def test_config_only(self): + """A provisioning config without a log means provisioning.""" + ret = self.call(files={self.prov_cfg: "key=value"}, func=self.funcname) + self.assertEqual(shell_true, ret.rc) + + def test_config_with_old_log(self): + """A config with a log from previous boot is not provisioning.""" + rootd = self.tmp_dir() + data = {self.prov_cfg: ("key=value\nkey2=val2\n", -10), + self.inst_log: ("log data\n", -30), + self.boot_ref: ("PWD=/", 0)} + populate_dir_with_ts(rootd, data) + ret = self.call(rootd=rootd, func=self.funcname) + self.assertEqual(shell_false, ret.rc) + self.assertIn("from previous boot", ret.stderr) + + def test_config_with_new_log(self): + """A config with a log from this boot is provisioning.""" + rootd = self.tmp_dir() + data = {self.prov_cfg: ("key=value\nkey2=val2\n", -10), + self.inst_log: ("log data\n", 30), + self.boot_ref: ("PWD=/", 0)} + populate_dir_with_ts(rootd, data) + ret = self.call(rootd=rootd, func=self.funcname) + self.assertEqual(shell_true, ret.rc) + self.assertIn("from current boot", ret.stderr) + + def blkid_out(disks=None): """Convert a list of disk dictionaries into blkid content.""" if disks is None: @@ -639,6 +692,7 @@ VALID_CFG = { 'ds': 'IBMCloud', 'mocks': [ MOCK_VIRT_IS_XEN, + {'name': 'is_ibm_provisioning', 'ret': shell_false}, {'name': 'blkid', 'ret': 0, 'out': blkid_out( [{'DEVNAME': 'xvda1', 'TYPE': 'vfat', 'PARTUUID': uuid4()}, @@ -652,6 +706,7 @@ VALID_CFG = { 'ds': 'IBMCloud', 'mocks': [ MOCK_VIRT_IS_XEN, + {'name': 'is_ibm_provisioning', 'ret': shell_false}, {'name': 'blkid', 'ret': 0, 'out': blkid_out( [{'DEVNAME': 'xvda1', 'TYPE': 'ext3', 'PARTUUID': uuid4(), @@ -669,6 +724,7 @@ VALID_CFG = { 'ds': 'IBMCloud', 'mocks': [ MOCK_VIRT_IS_XEN, + {'name': 'is_ibm_provisioning', 'ret': shell_false}, {'name': 'blkid', 'ret': 0, 'out': blkid_out( [{'DEVNAME': 'xvda1', 'TYPE': 'vfat', 'PARTUUID': uuid4()}, |