# This file is part of cloud-init. See LICENSE file for license information. import copy import os from collections import namedtuple from uuid import uuid4 from cloudinit import safeyaml, subp, util from cloudinit.sources import DataSourceIBMCloud as ds_ibm from cloudinit.sources import DataSourceOracle as ds_oracle from cloudinit.sources import DataSourceSmartOS as ds_smartos from tests.unittests.helpers import ( CiTestCase, cloud_init_project_dir, dir2dict, populate_dir, populate_dir_with_ts, ) UNAME_MYSYS = ( "Linux bart 4.4.0-62-generic #83-Ubuntu " "SMP Wed Jan 18 14:10:15 UTC 2017 x86_64 GNU/Linux" ) UNAME_PPC64EL = ( "Linux diamond 4.4.0-83-generic #106-Ubuntu SMP " "Mon Jun 26 17:53:54 UTC 2017 " "ppc64le ppc64le ppc64le GNU/Linux" ) UNAME_FREEBSD = ( "FreeBSD fbsd12-1 12.1-RELEASE-p10 FreeBSD 12.1-RELEASE-p10 GENERIC amd64" ) BLKID_EFI_ROOT = """ DEVNAME=/dev/sda1 UUID=8B36-5390 TYPE=vfat PARTUUID=30d7c715-a6ae-46ee-b050-afc6467fc452 DEVNAME=/dev/sda2 UUID=19ac97d5-6973-4193-9a09-2e6bbfa38262 TYPE=ext4 PARTUUID=30c65c77-e07d-4039-b2fb-88b1fb5fa1fc """ # this is a Ubuntu 18.04 disk.img output (dual uefi and bios bootable) BLKID_UEFI_UBUNTU = [ {"DEVNAME": "vda1", "TYPE": "ext4", "PARTUUID": uuid4(), "UUID": uuid4()}, {"DEVNAME": "vda14", "PARTUUID": uuid4()}, { "DEVNAME": "vda15", "TYPE": "vfat", "LABEL": "UEFI", "PARTUUID": uuid4(), "UUID": "5F55-129B", }, ] POLICY_FOUND_ONLY = "search,found=all,maybe=none,notfound=disabled" POLICY_FOUND_OR_MAYBE = "search,found=all,maybe=all,notfound=disabled" DI_DEFAULT_POLICY = "search,found=all,maybe=all,notfound=disabled" DI_DEFAULT_POLICY_NO_DMI = "search,found=all,maybe=all,notfound=enabled" DI_EC2_STRICT_ID_DEFAULT = "true" OVF_MATCH_STRING = "http://schemas.dmtf.org/ovf/environment/1" SHELL_MOCK_TMPL = """\ %(name)s() { local out='%(out)s' err='%(err)s' r='%(ret)s' RET='%(RET)s' [ "$out" = "_unset" ] || echo "$out" [ "$err" = "_unset" ] || echo "$err" 2>&1 [ "$RET" = "_unset" ] || _RET="$RET" return $r } """ RC_FOUND = 0 RC_NOT_FOUND = 1 DS_NONE = "None" P_CHASSIS_ASSET_TAG = "sys/class/dmi/id/chassis_asset_tag" P_PRODUCT_NAME = "sys/class/dmi/id/product_name" P_PRODUCT_SERIAL = "sys/class/dmi/id/product_serial" P_PRODUCT_UUID = "sys/class/dmi/id/product_uuid" P_SYS_VENDOR = "sys/class/dmi/id/sys_vendor" P_SEED_DIR = "var/lib/cloud/seed" P_DSID_CFG = "etc/cloud/ds-identify.cfg" IBM_CONFIG_UUID = "9796-932E" MOCK_VIRT_IS_CONTAINER_OTHER = { "name": "detect_virt", "RET": "container-other", "ret": 0, } MOCK_VIRT_IS_KVM = {"name": "detect_virt", "RET": "kvm", "ret": 0} MOCK_VIRT_IS_VMWARE = {"name": "detect_virt", "RET": "vmware", "ret": 0} # currenty' SmartOS hypervisor "bhyve" is unknown by systemd-detect-virt. MOCK_VIRT_IS_VM_OTHER = {"name": "detect_virt", "RET": "vm-other", "ret": 0} MOCK_VIRT_IS_XEN = {"name": "detect_virt", "RET": "xen", "ret": 0} MOCK_UNAME_IS_PPC64 = {"name": "uname", "out": UNAME_PPC64EL, "ret": 0} MOCK_UNAME_IS_FREEBSD = {"name": "uname", "out": UNAME_FREEBSD, "ret": 0} shell_true = 0 shell_false = 1 CallReturn = namedtuple( "CallReturn", ["rc", "stdout", "stderr", "cfg", "files"] ) class DsIdentifyBase(CiTestCase): dsid_path = cloud_init_project_dir("tools/ds-identify") allowed_subp = ["sh"] def call( self, rootd=None, mocks=None, func="main", args=None, files=None, policy_dmi=DI_DEFAULT_POLICY, policy_no_dmi=DI_DEFAULT_POLICY_NO_DMI, ec2_strict_id=DI_EC2_STRICT_ID_DEFAULT, ): if args is None: args = [] if mocks is None: mocks = [] if files is None: files = {} if rootd is None: rootd = self.tmp_dir() unset = "_unset" wrap = self.tmp_path(path="_shwrap", dir=rootd) populate_dir(rootd, files) # DI_DEFAULT_POLICY* are declared always as to not rely # on the default in the code. This is because SRU releases change # the value in the code, and thus tests would fail there. head = [ "DI_MAIN=noop", "DEBUG_LEVEL=2", "DI_LOG=stderr", "PATH_ROOT='%s'" % rootd, ". " + self.dsid_path, 'DI_DEFAULT_POLICY="%s"' % policy_dmi, 'DI_DEFAULT_POLICY_NO_DMI="%s"' % policy_no_dmi, 'DI_EC2_STRICT_ID_DEFAULT="%s"' % ec2_strict_id, "", ] def write_mock(data): ddata = {"out": None, "err": None, "ret": 0, "RET": None} ddata.update(data) for k in ddata: if ddata[k] is None: ddata[k] = unset return SHELL_MOCK_TMPL % ddata mocklines = [] defaults = [ {"name": "detect_virt", "RET": "none", "ret": 1}, {"name": "uname", "out": UNAME_MYSYS}, {"name": "blkid", "out": BLKID_EFI_ROOT}, { "name": "ovf_vmware_transport_guestinfo", "out": "No value found", "ret": 1, }, { "name": "dmi_decode", "ret": 1, "err": "No dmidecode program. ERROR.", }, { "name": "get_kenv_field", "ret": 1, "err": "No kenv program. ERROR.", }, ] written = [d["name"] for d in mocks] for data in mocks: mocklines.append(write_mock(data)) for d in defaults: if d["name"] not in written: mocklines.append(write_mock(d)) endlines = [func + " " + " ".join(['"%s"' % s for s in args])] with open(wrap, "w") as fp: fp.write("\n".join(head + mocklines + endlines) + "\n") rc = 0 try: out, err = subp.subp(["sh", "-c", ". %s" % wrap], capture=True) except subp.ProcessExecutionError as e: rc = e.exit_code out = e.stdout err = e.stderr cfg = None cfg_out = os.path.join(rootd, "run/cloud-init/cloud.cfg") if os.path.exists(cfg_out): contents = util.load_file(cfg_out) try: cfg = safeyaml.load(contents) except Exception as e: cfg = {"_INVALID_YAML": contents, "_EXCEPTION": str(e)} return CallReturn(rc, out, err, cfg, dir2dict(rootd)) def _call_via_dict(self, data, rootd=None, **kwargs): # return output of self.call with a dict input like VALID_CFG[item] xwargs = {"rootd": rootd} passthrough = ( "mocks", "func", "args", "policy_dmi", "policy_no_dmi", "files", ) for k in passthrough: if k in data: xwargs[k] = data[k] if k in kwargs: xwargs[k] = kwargs[k] return self.call(**xwargs) def _test_ds_found(self, name): data = copy.deepcopy(VALID_CFG[name]) return self._check_via_dict( data, RC_FOUND, dslist=[data.get("ds"), DS_NONE] ) def _test_ds_not_found(self, name): data = copy.deepcopy(VALID_CFG[name]) return self._check_via_dict(data, RC_NOT_FOUND) def _check_via_dict(self, data, rc, dslist=None, **kwargs): ret = self._call_via_dict(data, **kwargs) good = False try: self.assertEqual(rc, ret.rc) if dslist is not None: self.assertEqual(dslist, ret.cfg["datasource_list"]) good = True finally: if not good: _print_run_output( ret.rc, ret.stdout, ret.stderr, ret.cfg, ret.files ) return ret class TestDsIdentify(DsIdentifyBase): def test_wb_print_variables(self): """_print_info reports an array of discovered variables to stderr.""" data = VALID_CFG["Azure-dmi-detection"] _, _, err, _, _ = self._call_via_dict(data) expected_vars = [ "DMI_PRODUCT_NAME", "DMI_SYS_VENDOR", "DMI_PRODUCT_SERIAL", "DMI_PRODUCT_UUID", "PID_1_PRODUCT_NAME", "DMI_CHASSIS_ASSET_TAG", "FS_LABELS", "KERNEL_CMDLINE", "VIRT", "UNAME_KERNEL_NAME", "UNAME_KERNEL_RELEASE", "UNAME_KERNEL_VERSION", "UNAME_MACHINE", "UNAME_NODENAME", "UNAME_OPERATING_SYSTEM", "DSNAME", "DSLIST", "MODE", "ON_FOUND", "ON_MAYBE", "ON_NOTFOUND", ] for var in expected_vars: self.assertIn("{0}=".format(var), err) def test_azure_dmi_detection_from_chassis_asset_tag(self): """Azure datasource is detected from DMI chassis-asset-tag""" self._test_ds_found("Azure-dmi-detection") def test_azure_seed_file_detection(self): """Azure datasource is detected due to presence of a seed file. The seed file tested is /var/lib/cloud/seed/azure/ovf-env.xml.""" self._test_ds_found("Azure-seed-detection") def test_aws_ec2_hvm(self): """EC2: hvm instances use dmi serial and uuid starting with 'ec2'.""" self._test_ds_found("Ec2-hvm") def test_aws_ec2_xen(self): """EC2: sys/hypervisor/uuid starts with ec2.""" self._test_ds_found("Ec2-xen") def test_brightbox_is_ec2(self): """EC2: product_serial ends with '.brightbox.com'""" self._test_ds_found("Ec2-brightbox") def test_bobrightbox_is_not_brightbox(self): """EC2: bobrightbox.com in product_serial is not brightbox'""" self._test_ds_not_found("Ec2-brightbox-negative") def test_freebsd_nocloud(self): """NoCloud identified on FreeBSD via label by geom.""" self._test_ds_found("NoCloud-fbsd") def test_gce_by_product_name(self): """GCE identifies itself with product_name.""" self._test_ds_found("GCE") def test_gce_by_serial(self): """Older gce compute instances must be identified by serial.""" self._test_ds_found("GCE-serial") def test_config_drive(self): """ConfigDrive datasource has a disk with LABEL=config-2.""" self._test_ds_found("ConfigDrive") def test_rbx_cloud(self): """Rbx datasource has a disk with LABEL=CLOUDMD.""" self._test_ds_found("RbxCloud") def test_rbx_cloud_lower(self): """Rbx datasource has a disk with LABEL=cloudmd.""" self._test_ds_found("RbxCloudLower") def test_config_drive_upper(self): """ConfigDrive datasource has a disk with LABEL=CONFIG-2.""" self._test_ds_found("ConfigDriveUpper") def test_config_drive_seed(self): """Config Drive seed directory.""" self._test_ds_found("ConfigDrive-seed") def test_config_drive_interacts_with_ibmcloud_config_disk(self): """Verify ConfigDrive interaction with IBMCloud. If ConfigDrive is enabled and not IBMCloud, then ConfigDrive should claim the ibmcloud 'config-2' disk. If IBMCloud is enabled, then ConfigDrive should skip.""" data = copy.deepcopy(VALID_CFG["IBMCloud-config-2"]) files = data.get("files", {}) if not files: data["files"] = files cfgpath = "etc/cloud/cloud.cfg.d/99_networklayer_common.cfg" # with list including IBMCloud, config drive should be not found. files[cfgpath] = "datasource_list: [ ConfigDrive, IBMCloud ]\n" ret = self._check_via_dict(data, shell_true) self.assertEqual(ret.cfg.get("datasource_list"), ["IBMCloud", "None"]) # But if IBMCloud is not enabled, config drive should claim this. files[cfgpath] = "datasource_list: [ ConfigDrive, NoCloud ]\n" ret = self._check_via_dict(data, shell_true) self.assertEqual( ret.cfg.get("datasource_list"), ["ConfigDrive", "None"] ) def test_ibmcloud_template_userdata_in_provisioning(self): """Template provisioned with user-data during provisioning stage. Template provisioning with user-data has METADATA disk, datasource should return not found.""" data = copy.deepcopy(VALID_CFG["IBMCloud-metadata"]) # change the 'is_ibm_provisioning' mock to return 1 (false) isprov_m = [ m for m in data["mocks"] if m["name"] == "is_ibm_provisioning" ][0] isprov_m["ret"] = shell_true return self._check_via_dict(data, RC_NOT_FOUND) def test_ibmcloud_template_userdata(self): """Template provisioned with user-data first boot. Template provisioning with user-data has METADATA disk. datasource should return found.""" self._test_ds_found("IBMCloud-metadata") def test_ibmcloud_template_no_userdata_in_provisioning(self): """Template provisioned with no user-data during provisioning. no disks attached. Datasource should return not found.""" data = copy.deepcopy(VALID_CFG["IBMCloud-nodisks"]) data["mocks"].append( {"name": "is_ibm_provisioning", "ret": shell_true} ) return self._check_via_dict(data, RC_NOT_FOUND) def test_ibmcloud_template_no_userdata(self): """Template provisioned with no user-data first boot. no disks attached. Datasource should return found.""" self._check_via_dict(VALID_CFG["IBMCloud-nodisks"], RC_NOT_FOUND) def test_ibmcloud_os_code(self): """Launched by os code always has config-2 disk.""" self._test_ds_found("IBMCloud-config-2") def test_ibmcloud_os_code_different_uuid(self): """IBM cloud config-2 disks must be explicit match on UUID. If the UUID is not 9796-932E then we actually expect ConfigDrive.""" data = copy.deepcopy(VALID_CFG["IBMCloud-config-2"]) offset = None for m, d in enumerate(data["mocks"]): if d.get("name") == "blkid": offset = m break if not offset: raise ValueError("Expected to find 'blkid' mock, but did not.") data["mocks"][offset]["out"] = d["out"].replace( ds_ibm.IBM_CONFIG_UUID, "DEAD-BEEF" ) self._check_via_dict( data, rc=RC_FOUND, dslist=["ConfigDrive", DS_NONE] ) def test_ibmcloud_with_nocloud_seed(self): """NoCloud seed should be preferred over IBMCloud. A nocloud seed should be preferred over IBMCloud even if enabled. Ubuntu 16.04 images have /seed/nocloud-net. LP: #1766401.""" data = copy.deepcopy(VALID_CFG["IBMCloud-config-2"]) files = data.get("files", {}) if not files: data["files"] = files files.update(VALID_CFG["NoCloud-seed"]["files"]) ret = self._check_via_dict(data, shell_true) self.assertEqual( ["NoCloud", "IBMCloud", "None"], ret.cfg.get("datasource_list") ) def test_ibmcloud_with_configdrive_seed(self): """ConfigDrive seed should be preferred over IBMCloud. A ConfigDrive seed should be preferred over IBMCloud even if enabled. Ubuntu 16.04 images have a fstab entry that mounts the METADATA disk into /seed/config_drive. LP: ##1766401.""" data = copy.deepcopy(VALID_CFG["IBMCloud-config-2"]) files = data.get("files", {}) if not files: data["files"] = files files.update(VALID_CFG["ConfigDrive-seed"]["files"]) ret = self._check_via_dict(data, shell_true) self.assertEqual( ["ConfigDrive", "IBMCloud", "None"], ret.cfg.get("datasource_list") ) def test_policy_disabled(self): """A Builtin policy of 'disabled' should return not found. Even though a search would find something, the builtin policy of disabled should cause the return of not found.""" mydata = copy.deepcopy(VALID_CFG["Ec2-hvm"]) self._check_via_dict(mydata, rc=RC_NOT_FOUND, policy_dmi="disabled") def test_policy_config_disable_overrides_builtin(self): """explicit policy: disabled in config file should cause not found.""" mydata = copy.deepcopy(VALID_CFG["Ec2-hvm"]) mydata["files"][P_DSID_CFG] = "\n".join(["policy: disabled", ""]) self._check_via_dict(mydata, rc=RC_NOT_FOUND) def test_single_entry_defines_datasource(self): """If config has a single entry in datasource_list, that is used. Test the valid Ec2-hvm, but provide a config file that specifies a single entry in datasource_list. The configured value should be used.""" mydata = copy.deepcopy(VALID_CFG["Ec2-hvm"]) cfgpath = "etc/cloud/cloud.cfg.d/myds.cfg" mydata["files"][cfgpath] = 'datasource_list: ["NoCloud"]\n' self._check_via_dict(mydata, rc=RC_FOUND, dslist=["NoCloud", DS_NONE]) def test_configured_list_with_none(self): """When datasource_list already contains None, None is not added. The explicitly configured datasource_list has 'None' in it. That should not have None automatically added.""" mydata = copy.deepcopy(VALID_CFG["GCE"]) cfgpath = "etc/cloud/cloud.cfg.d/myds.cfg" mydata["files"][cfgpath] = 'datasource_list: ["Ec2", "None"]\n' self._check_via_dict(mydata, rc=RC_FOUND, dslist=["Ec2", DS_NONE]) def test_aliyun_identified(self): """Test that Aliyun cloud is identified by product id.""" self._test_ds_found("AliYun") def test_aliyun_over_ec2(self): """Even if all other factors identified Ec2, AliYun should be used.""" mydata = copy.deepcopy(VALID_CFG["Ec2-xen"]) self._test_ds_found("AliYun") prod_name = VALID_CFG["AliYun"]["files"][P_PRODUCT_NAME] mydata["files"][P_PRODUCT_NAME] = prod_name policy = "search,found=first,maybe=none,notfound=disabled" self._check_via_dict( mydata, rc=RC_FOUND, dslist=["AliYun", DS_NONE], policy_dmi=policy ) def test_default_openstack_intel_is_found(self): """On Intel, openstack must be identified.""" self._test_ds_found("OpenStack") def test_openstack_open_telekom_cloud(self): """Open Telecom identification.""" self._test_ds_found("OpenStack-OpenTelekom") def test_openstack_sap_ccloud(self): """SAP Converged Cloud identification""" self._test_ds_found("OpenStack-SAPCCloud") def test_openstack_asset_tag_nova(self): """OpenStack identification via asset tag OpenStack Nova.""" self._test_ds_found("OpenStack-AssetTag-Nova") def test_openstack_asset_tag_copute(self): """OpenStack identification via asset tag OpenStack Compute.""" self._test_ds_found("OpenStack-AssetTag-Compute") def test_openstack_on_non_intel_is_maybe(self): """On non-Intel, openstack without dmi info is maybe. nova does not identify itself on platforms other than intel. https://bugs.launchpad.net/cloud-init/+bugs?field.tag=dsid-nova""" data = copy.deepcopy(VALID_CFG["OpenStack"]) del data["files"][P_PRODUCT_NAME] data.update( { "policy_dmi": POLICY_FOUND_OR_MAYBE, "policy_no_dmi": POLICY_FOUND_OR_MAYBE, } ) # this should show not found as default uname in tests is intel. # and intel openstack requires positive identification. self._check_via_dict(data, RC_NOT_FOUND, dslist=None) # updating the uname to ppc64 though should get a maybe. data.update({"mocks": [MOCK_VIRT_IS_KVM, MOCK_UNAME_IS_PPC64]}) (_, _, err, _, _) = self._check_via_dict( data, RC_FOUND, dslist=["OpenStack", "None"] ) self.assertIn("check for 'OpenStack' returned maybe", err) def test_default_ovf_is_found(self): """OVF is identified found when ovf/ovf-env.xml seed file exists.""" self._test_ds_found("OVF-seed") def test_default_ovf_with_detect_virt_none_not_found(self): """OVF identifies not found when detect_virt returns "none".""" self._check_via_dict( {"ds": "OVF"}, rc=RC_NOT_FOUND, policy_dmi="disabled" ) def test_default_ovf_returns_not_found_on_azure(self): """OVF datasource won't be found as false positive on Azure.""" ovfonazure = copy.deepcopy(VALID_CFG["OVF"]) # Set azure asset tag to assert OVF content not found ovfonazure["files"][ P_CHASSIS_ASSET_TAG ] = "7783-7084-3265-9085-8269-3286-77\n" self._check_via_dict(ovfonazure, RC_FOUND, dslist=["Azure", DS_NONE]) def test_ovf_on_vmware_iso_found_by_cdrom_with_ovf_schema_match(self): """OVF is identified when iso9660 cdrom path contains ovf schema.""" self._test_ds_found("OVF") def test_ovf_on_vmware_guestinfo_found(self): """OVF guest info is found on vmware.""" self._test_ds_found("OVF-guestinfo") def test_ovf_on_vmware_iso_found_when_vmware_customization(self): """OVF is identified when vmware customization is enabled.""" self._test_ds_found("OVF-vmware-customization") def test_ovf_on_vmware_iso_found_open_vm_tools_64(self): """OVF is identified when open-vm-tools installed in /usr/lib64.""" cust64 = copy.deepcopy(VALID_CFG["OVF-vmware-customization"]) p32 = "usr/lib/vmware-tools/plugins/vmsvc/libdeployPkgPlugin.so" open64 = "usr/lib64/open-vm-tools/plugins/vmsvc/libdeployPkgPlugin.so" cust64["files"][open64] = cust64["files"][p32] del cust64["files"][p32] return self._check_via_dict( cust64, RC_FOUND, dslist=[cust64.get("ds"), DS_NONE] ) def test_ovf_on_vmware_iso_found_open_vm_tools_x86_64_linux_gnu(self): """OVF is identified when open-vm-tools installed in /usr/lib/x86_64-linux-gnu.""" cust64 = copy.deepcopy(VALID_CFG["OVF-vmware-customization"]) p32 = "usr/lib/vmware-tools/plugins/vmsvc/libdeployPkgPlugin.so" x86 = ( "usr/lib/x86_64-linux-gnu/open-vm-tools/plugins/vmsvc/" "libdeployPkgPlugin.so" ) cust64["files"][x86] = cust64["files"][p32] del cust64["files"][p32] return self._check_via_dict( cust64, RC_FOUND, dslist=[cust64.get("ds"), DS_NONE] ) def test_ovf_on_vmware_iso_found_open_vm_tools_aarch64_linux_gnu(self): """OVF is identified when open-vm-tools installed in /usr/lib/aarch64-linux-gnu.""" cust64 = copy.deepcopy(VALID_CFG["OVF-vmware-customization"]) p32 = "usr/lib/vmware-tools/plugins/vmsvc/libdeployPkgPlugin.so" aarch64 = ( "usr/lib/aarch64-linux-gnu/open-vm-tools/plugins/vmsvc/" "libdeployPkgPlugin.so" ) cust64["files"][aarch64] = cust64["files"][p32] del cust64["files"][p32] return self._check_via_dict( cust64, RC_FOUND, dslist=[cust64.get("ds"), DS_NONE] ) def test_ovf_on_vmware_iso_found_by_cdrom_with_matching_fs_label(self): """OVF is identified by well-known iso9660 labels.""" ovf_cdrom_by_label = copy.deepcopy(VALID_CFG["OVF"]) # Unset matching cdrom ovf schema content ovf_cdrom_by_label["files"]["dev/sr0"] = "No content match" self._check_via_dict( ovf_cdrom_by_label, rc=RC_NOT_FOUND, policy_dmi="disabled" ) # Add recognized labels valid_ovf_labels = [ "ovf-transport", "OVF-TRANSPORT", "OVFENV", "ovfenv", "OVF ENV", "ovf env", ] for valid_ovf_label in valid_ovf_labels: ovf_cdrom_by_label["mocks"][0]["out"] = blkid_out( [ {"DEVNAME": "sda1", "TYPE": "ext4", "LABEL": "rootfs"}, { "DEVNAME": "sr0", "TYPE": "iso9660", "LABEL": valid_ovf_label, }, {"DEVNAME": "vda1", "TYPE": "ntfs", "LABEL": "data"}, ] ) self._check_via_dict( ovf_cdrom_by_label, rc=RC_FOUND, dslist=["OVF", DS_NONE] ) def test_ovf_on_vmware_iso_found_by_cdrom_with_different_size(self): """OVF is identified by well-known iso9660 labels.""" ovf_cdrom_with_size = copy.deepcopy(VALID_CFG["OVF"]) # Set cdrom size to 20480 (10MB in 512 byte units) ovf_cdrom_with_size["files"]["sys/class/block/sr0/size"] = "20480\n" self._check_via_dict( ovf_cdrom_with_size, rc=RC_NOT_FOUND, policy_dmi="disabled" ) # Set cdrom size to 204800 (100MB in 512 byte units) ovf_cdrom_with_size["files"]["sys/class/block/sr0/size"] = "204800\n" self._check_via_dict( ovf_cdrom_with_size, rc=RC_NOT_FOUND, policy_dmi="disabled" ) # Set cdrom size to 18432 (9MB in 512 byte units) ovf_cdrom_with_size["files"]["sys/class/block/sr0/size"] = "18432\n" self._check_via_dict( ovf_cdrom_with_size, rc=RC_FOUND, dslist=["OVF", DS_NONE] ) # Set cdrom size to 2048 (1MB in 512 byte units) ovf_cdrom_with_size["files"]["sys/class/block/sr0/size"] = "2048\n" self._check_via_dict( ovf_cdrom_with_size, rc=RC_FOUND, dslist=["OVF", DS_NONE] ) def test_default_nocloud_as_vdb_iso9660(self): """NoCloud is found with iso9660 filesystem on non-cdrom disk.""" self._test_ds_found("NoCloud") def test_nocloud_upper(self): """NoCloud is found with uppercase filesystem label.""" self._test_ds_found("NoCloudUpper") def test_nocloud_fatboot(self): """NoCloud fatboot label - LP: #184166.""" self._test_ds_found("NoCloud-fatboot") def test_nocloud_seed(self): """Nocloud seed directory.""" self._test_ds_found("NoCloud-seed") def test_nocloud_seed_ubuntu_core_writable(self): """Nocloud seed directory ubuntu core writable""" self._test_ds_found("NoCloud-seed-ubuntu-core") def test_hetzner_found(self): """Hetzner cloud is identified in sys_vendor.""" self._test_ds_found("Hetzner") def test_smartos_bhyve(self): """SmartOS cloud identified by SmartDC in dmi.""" self._test_ds_found("SmartOS-bhyve") def test_smartos_lxbrand(self): """SmartOS cloud identified on lxbrand container.""" self._test_ds_found("SmartOS-lxbrand") def test_smartos_lxbrand_requires_socket(self): """SmartOS cloud should not be identified if no socket file.""" mycfg = copy.deepcopy(VALID_CFG["SmartOS-lxbrand"]) del mycfg["files"][ds_smartos.METADATA_SOCKFILE] self._check_via_dict(mycfg, rc=RC_NOT_FOUND, policy_dmi="disabled") def test_path_env_gets_set_from_main(self): """PATH environment should always have some tokens when main is run. We explicitly call main as we want to ensure it updates PATH.""" cust = copy.deepcopy(VALID_CFG["NoCloud"]) rootd = self.tmp_dir() mpp = "main-printpath" pre = "MYPATH=" cust["files"][mpp] = ( 'PATH="/mycust/path"; main; r=$?; echo ' + pre + "$PATH; exit $r;" ) ret = self._check_via_dict( cust, RC_FOUND, func=".", args=[os.path.join(rootd, mpp)], rootd=rootd, ) match = [ line for line in ret.stdout.splitlines() if line.startswith(pre) ][0] toks = match.replace(pre, "").split(":") expected = ["/sbin", "/bin", "/usr/sbin", "/usr/bin", "/mycust/path"] self.assertEqual( expected, [p for p in expected if p in toks], "path did not have expected tokens", ) def test_zstack_is_ec2(self): """EC2: chassis asset tag ends with 'zstack.io'""" self._test_ds_found("Ec2-ZStack") def test_e24cloud_is_ec2(self): """EC2: e24cloud identified by sys_vendor""" self._test_ds_found("Ec2-E24Cloud") def test_e24cloud_not_active(self): """EC2: bobrightbox.com in product_serial is not brightbox'""" self._test_ds_not_found("Ec2-E24Cloud-negative") def test_vmware_no_valid_transports(self): """VMware: no valid transports""" self._test_ds_not_found("VMware-NoValidTransports") def test_vmware_envvar_no_data(self): """VMware: envvar transport no data""" self._test_ds_not_found("VMware-EnvVar-NoData") def test_vmware_envvar_no_virt_id(self): """VMware: envvar transport success if no virt id""" self._test_ds_found("VMware-EnvVar-NoVirtID") def test_vmware_envvar_activated_by_metadata(self): """VMware: envvar transport activated by metadata""" self._test_ds_found("VMware-EnvVar-Metadata") def test_vmware_envvar_activated_by_userdata(self): """VMware: envvar transport activated by userdata""" self._test_ds_found("VMware-EnvVar-Userdata") def test_vmware_envvar_activated_by_vendordata(self): """VMware: envvar transport activated by vendordata""" self._test_ds_found("VMware-EnvVar-Vendordata") def test_vmware_guestinfo_no_data(self): """VMware: guestinfo transport no data""" self._test_ds_not_found("VMware-GuestInfo-NoData") def test_vmware_guestinfo_no_virt_id(self): """VMware: guestinfo transport fails if no virt id""" self._test_ds_not_found("VMware-GuestInfo-NoVirtID") def test_vmware_guestinfo_activated_by_metadata(self): """VMware: guestinfo transport activated by metadata""" self._test_ds_found("VMware-GuestInfo-Metadata") def test_vmware_guestinfo_activated_by_userdata(self): """VMware: guestinfo transport activated by userdata""" self._test_ds_found("VMware-GuestInfo-Userdata") def test_vmware_guestinfo_activated_by_vendordata(self): """VMware: guestinfo transport activated by vendordata""" self._test_ds_found("VMware-GuestInfo-Vendordata") class TestBSDNoSys(DsIdentifyBase): """Test *BSD code paths FreeBSD doesn't have /sys so we use kenv(1) here. Other BSD systems fallback to dmidecode(8). BSDs also doesn't have systemd-detect-virt(8), so we use sysctl(8) to query kern.vm_guest, and optionally map it""" def test_dmi_kenv(self): """Test that kenv(1) works on systems which don't have /sys This will be used on FreeBSD systems. """ self._test_ds_found("Hetzner-kenv") def test_dmi_dmidecode(self): """Test that dmidecode(8) works on systems which don't have /sys This will be used on all other BSD systems. """ self._test_ds_found("Hetzner-dmidecode") class TestIsIBMProvisioning(DsIdentifyBase): """Test the is_ibm_provisioning method in ds-identify.""" inst_log = "/root/swinstall.log" prov_cfg = "/root/provisioningConfiguration.cfg" boot_ref = "/proc/1/environ" funcname = "is_ibm_provisioning" def test_no_config(self): """No provisioning config means not provisioning.""" ret = self.call(files={}, func=self.funcname) self.assertEqual(shell_false, ret.rc) def test_config_only(self): """A provisioning config without a log means provisioning.""" ret = self.call(files={self.prov_cfg: "key=value"}, func=self.funcname) self.assertEqual(shell_true, ret.rc) def test_config_with_old_log(self): """A config with a log from previous boot is not provisioning.""" rootd = self.tmp_dir() data = { self.prov_cfg: ("key=value\nkey2=val2\n", -10), self.inst_log: ("log data\n", -30), self.boot_ref: ("PWD=/", 0), } populate_dir_with_ts(rootd, data) ret = self.call(rootd=rootd, func=self.funcname) self.assertEqual(shell_false, ret.rc) self.assertIn("from previous boot", ret.stderr) def test_config_with_new_log(self): """A config with a log from this boot is provisioning.""" rootd = self.tmp_dir() data = { self.prov_cfg: ("key=value\nkey2=val2\n", -10), self.inst_log: ("log data\n", 30), self.boot_ref: ("PWD=/", 0), } populate_dir_with_ts(rootd, data) ret = self.call(rootd=rootd, func=self.funcname) self.assertEqual(shell_true, ret.rc) self.assertIn("from current boot", ret.stderr) class TestOracle(DsIdentifyBase): def test_found_by_chassis(self): """Simple positive test of Oracle by chassis id.""" self._test_ds_found("Oracle") def test_not_found(self): """Simple negative test of Oracle.""" mycfg = copy.deepcopy(VALID_CFG["Oracle"]) mycfg["files"][P_CHASSIS_ASSET_TAG] = "Not Oracle" self._check_via_dict(mycfg, rc=RC_NOT_FOUND) def blkid_out(disks=None): """Convert a list of disk dictionaries into blkid content.""" if disks is None: disks = [] lines = [] for disk in disks: if not disk["DEVNAME"].startswith("/dev/"): disk["DEVNAME"] = "/dev/" + disk["DEVNAME"] # devname needs to be first. lines.append("%s=%s" % ("DEVNAME", disk["DEVNAME"])) for key in [d for d in disk if d != "DEVNAME"]: lines.append("%s=%s" % (key, disk[key])) lines.append("") return "\n".join(lines) def geom_out(disks=None): """Convert a list of disk dictionaries into geom content. geom called with -a (provider) and -s (script-friendly), will produce the following output: gpt/gptboot0 N/A vtbd1p1 gpt/swap0 N/A vtbd1p2 iso9660/cidata N/A vtbd2 """ if disks is None: disks = [] lines = [] for disk in disks: lines.append( "%s/%s N/A %s" % (disk["TYPE"], disk["LABEL"], disk["DEVNAME"]) ) lines.append("") return "\n".join(lines) def _print_run_output(rc, out, err, cfg, files): """A helper to print return of TestDsIdentify. _print_run_output(self.call())""" print( "\n".join( [ "-- rc = %s --" % rc, "-- out --", str(out), "-- err --", str(err), "-- cfg --", util.json_dumps(cfg), ] ) ) print("-- files --") for k, v in files.items(): if "/_shwrap" in k: continue print(" === %s ===" % k) for line in v.splitlines(): print(" " + line) VALID_CFG = { "AliYun": { "ds": "AliYun", "files": {P_PRODUCT_NAME: "Alibaba Cloud ECS\n"}, }, "Azure-dmi-detection": { "ds": "Azure", "files": { P_CHASSIS_ASSET_TAG: "7783-7084-3265-9085-8269-3286-77\n", }, }, "Azure-seed-detection": { "ds": "Azure", "files": { P_CHASSIS_ASSET_TAG: "No-match\n", os.path.join(P_SEED_DIR, "azure", "ovf-env.xml"): "present\n", }, }, "Ec2-hvm": { "ds": "Ec2", "mocks": [{"name": "detect_virt", "RET": "kvm", "ret": 0}], "files": { P_PRODUCT_SERIAL: "ec23aef5-54be-4843-8d24-8c819f88453e\n", P_PRODUCT_UUID: "EC23AEF5-54BE-4843-8D24-8C819F88453E\n", }, }, "Ec2-xen": { "ds": "Ec2", "mocks": [MOCK_VIRT_IS_XEN], "files": { "sys/hypervisor/uuid": "ec2c6e2f-5fac-4fc7-9c82-74127ec14bbb\n" }, }, "Ec2-brightbox": { "ds": "Ec2", "files": {P_PRODUCT_SERIAL: "srv-otuxg.gb1.brightbox.com\n"}, }, "Ec2-brightbox-negative": { "ds": "Ec2", "files": {P_PRODUCT_SERIAL: "tricky-host.bobrightbox.com\n"}, }, "GCE": { "ds": "GCE", "files": {P_PRODUCT_NAME: "Google Compute Engine\n"}, "mocks": [MOCK_VIRT_IS_KVM], }, "GCE-serial": { "ds": "GCE", "files": {P_PRODUCT_SERIAL: "GoogleCloud-8f2e88f\n"}, "mocks": [MOCK_VIRT_IS_KVM], }, "NoCloud": { "ds": "NoCloud", "mocks": [ MOCK_VIRT_IS_KVM, { "name": "blkid", "ret": 0, "out": blkid_out( BLKID_UEFI_UBUNTU + [ { "DEVNAME": "vdb", "TYPE": "iso9660", "LABEL": "cidata", } ] ), }, ], "files": { "dev/vdb": "pretend iso content for cidata\n", }, }, "NoCloud-fbsd": { "ds": "NoCloud", "mocks": [ MOCK_VIRT_IS_KVM, MOCK_UNAME_IS_FREEBSD, { "name": "geom", "ret": 0, "out": geom_out( [{"DEVNAME": "vtbd", "TYPE": "iso9660", "LABEL": "cidata"}] ), }, ], "files": { "/dev/vtdb": "pretend iso content for cidata\n", }, }, "NoCloudUpper": { "ds": "NoCloud", "mocks": [ MOCK_VIRT_IS_KVM, { "name": "blkid", "ret": 0, "out": blkid_out( BLKID_UEFI_UBUNTU + [ { "DEVNAME": "vdb", "TYPE": "iso9660", "LABEL": "CIDATA", } ] ), }, ], "files": { "dev/vdb": "pretend iso content for cidata\n", }, }, "NoCloud-fatboot": { "ds": "NoCloud", "mocks": [ MOCK_VIRT_IS_XEN, { "name": "blkid", "ret": 0, "out": blkid_out( BLKID_UEFI_UBUNTU + [ { "DEVNAME": "xvdb", "TYPE": "vfat", "SEC_TYPE": "msdos", "UUID": "355a-4FC2", "LABEL_FATBOOT": "cidata", } ] ), }, ], "files": { "dev/vdb": "pretend iso content for cidata\n", }, }, "NoCloud-seed": { "ds": "NoCloud", "files": { os.path.join(P_SEED_DIR, "nocloud", "user-data"): "ud\n", os.path.join(P_SEED_DIR, "nocloud", "meta-data"): "md\n", }, }, "NoCloud-seed-ubuntu-core": { "ds": "NoCloud", "files": { os.path.join( "writable/system-data", P_SEED_DIR, "nocloud-net", "user-data" ): "ud\n", os.path.join( "writable/system-data", P_SEED_DIR, "nocloud-net", "meta-data" ): "md\n", }, }, "OpenStack": { "ds": "OpenStack", "files": {P_PRODUCT_NAME: "OpenStack Nova\n"}, "mocks": [MOCK_VIRT_IS_KVM], "policy_dmi": POLICY_FOUND_ONLY, "policy_no_dmi": POLICY_FOUND_ONLY, }, "OpenStack-OpenTelekom": { # OTC gen1 (Xen) hosts use OpenStack datasource, LP: #1756471 "ds": "OpenStack", "files": {P_CHASSIS_ASSET_TAG: "OpenTelekomCloud\n"}, "mocks": [MOCK_VIRT_IS_XEN], }, "OpenStack-SAPCCloud": { # SAP CCloud hosts use OpenStack on VMware "ds": "OpenStack", "files": {P_CHASSIS_ASSET_TAG: "SAP CCloud VM\n"}, "mocks": [MOCK_VIRT_IS_VMWARE], }, "OpenStack-AssetTag-Nova": { # VMware vSphere can't modify product-name, LP: #1669875 "ds": "OpenStack", "files": {P_CHASSIS_ASSET_TAG: "OpenStack Nova\n"}, "mocks": [MOCK_VIRT_IS_XEN], }, "OpenStack-AssetTag-Compute": { # VMware vSphere can't modify product-name, LP: #1669875 "ds": "OpenStack", "files": {P_CHASSIS_ASSET_TAG: "OpenStack Compute\n"}, "mocks": [MOCK_VIRT_IS_XEN], }, "OVF-seed": { "ds": "OVF", "files": { os.path.join(P_SEED_DIR, "ovf", "ovf-env.xml"): "present\n", }, }, "OVF-vmware-customization": { "ds": "OVF", "mocks": [ # Include a mockes iso9660 potential, even though content not ovf { "name": "blkid", "ret": 0, "out": blkid_out( [{"DEVNAME": "sr0", "TYPE": "iso9660", "LABEL": ""}] ), }, MOCK_VIRT_IS_VMWARE, ], "files": { "dev/sr0": "no match", # Setup vmware customization enabled "usr/lib/vmware-tools/plugins/vmsvc/libdeployPkgPlugin.so": "here", "etc/cloud/cloud.cfg": "disable_vmware_customization: false\n", }, }, "OVF": { "ds": "OVF", "mocks": [ { "name": "blkid", "ret": 0, "out": blkid_out( [ {"DEVNAME": "sr0", "TYPE": "iso9660", "LABEL": ""}, { "DEVNAME": "sr1", "TYPE": "iso9660", "LABEL": "ignoreme", }, { "DEVNAME": "vda1", "TYPE": "vfat", "PARTUUID": uuid4(), }, ] ), }, MOCK_VIRT_IS_VMWARE, ], "files": { "dev/sr0": "pretend ovf iso has " + OVF_MATCH_STRING + "\n", "sys/class/block/sr0/size": "2048\n", }, }, "OVF-guestinfo": { "ds": "OVF", "mocks": [ { "name": "ovf_vmware_transport_guestinfo", "ret": 0, "out": '\n