diff options
Diffstat (limited to 'tests/unittests/sources/test_vmware.py')
-rw-r--r-- | tests/unittests/sources/test_vmware.py | 389 |
1 files changed, 389 insertions, 0 deletions
diff --git a/tests/unittests/sources/test_vmware.py b/tests/unittests/sources/test_vmware.py new file mode 100644 index 00000000..dd331349 --- /dev/null +++ b/tests/unittests/sources/test_vmware.py @@ -0,0 +1,389 @@ +# Copyright (c) 2021 VMware, Inc. All Rights Reserved. +# +# Authors: Andrew Kutz <akutz@vmware.com> +# +# This file is part of cloud-init. See LICENSE file for license information. + +import base64 +import gzip +import os + +import pytest + +from cloudinit import dmi, helpers, safeyaml, settings +from cloudinit.sources import DataSourceVMware +from tests.unittests.helpers import ( + CiTestCase, + FilesystemMockingTestCase, + mock, + populate_dir, +) + +PRODUCT_NAME_FILE_PATH = "/sys/class/dmi/id/product_name" +PRODUCT_NAME = "VMware7,1" +PRODUCT_UUID = "82343CED-E4C7-423B-8F6B-0D34D19067AB" +REROOT_FILES = { + DataSourceVMware.PRODUCT_UUID_FILE_PATH: PRODUCT_UUID, + PRODUCT_NAME_FILE_PATH: PRODUCT_NAME, +} + +VMW_MULTIPLE_KEYS = [ + "ssh-rsa AAAAB3NzaC1yc2EAAAA... test1@vmw.com", + "ssh-rsa AAAAB3NzaC1yc2EAAAA... test2@vmw.com", +] +VMW_SINGLE_KEY = "ssh-rsa AAAAB3NzaC1yc2EAAAA... test@vmw.com" + +VMW_METADATA_YAML = """instance-id: cloud-vm +local-hostname: cloud-vm +network: + version: 2 + ethernets: + nics: + match: + name: ens* + dhcp4: yes +""" + +VMW_USERDATA_YAML = """## template: jinja +#cloud-config +users: +- default +""" + +VMW_VENDORDATA_YAML = """## template: jinja +#cloud-config +runcmd: +- echo "Hello, world." +""" + + +@pytest.fixture(autouse=True) +def common_patches(): + with mock.patch("cloudinit.util.platform.platform", return_value="Linux"): + with mock.patch.multiple( + "cloudinit.dmi", + is_container=mock.Mock(return_value=False), + is_FreeBSD=mock.Mock(return_value=False), + ): + yield + + +class TestDataSourceVMware(CiTestCase): + """ + Test common functionality that is not transport specific. + """ + + def setUp(self): + super(TestDataSourceVMware, self).setUp() + self.tmp = self.tmp_dir() + + def test_no_data_access_method(self): + ds = get_ds(self.tmp) + ds.vmware_rpctool = None + ret = ds.get_data() + self.assertFalse(ret) + + def test_get_host_info(self): + host_info = DataSourceVMware.get_host_info() + self.assertTrue(host_info) + self.assertTrue(host_info["hostname"]) + self.assertTrue(host_info["local-hostname"]) + self.assertTrue(host_info["local_hostname"]) + self.assertTrue(host_info[DataSourceVMware.LOCAL_IPV4]) + + +class TestDataSourceVMwareEnvVars(FilesystemMockingTestCase): + """ + Test the envvar transport. + """ + + def setUp(self): + super(TestDataSourceVMwareEnvVars, self).setUp() + self.tmp = self.tmp_dir() + os.environ[DataSourceVMware.VMX_GUESTINFO] = "1" + self.create_system_files() + + def tearDown(self): + del os.environ[DataSourceVMware.VMX_GUESTINFO] + return super(TestDataSourceVMwareEnvVars, self).tearDown() + + def create_system_files(self): + rootd = self.tmp_dir() + populate_dir( + rootd, + { + DataSourceVMware.PRODUCT_UUID_FILE_PATH: PRODUCT_UUID, + }, + ) + self.assertTrue(self.reRoot(rootd)) + + def assert_get_data_ok(self, m_fn, m_fn_call_count=6): + ds = get_ds(self.tmp) + ds.vmware_rpctool = None + ret = ds.get_data() + self.assertTrue(ret) + self.assertEqual(m_fn_call_count, m_fn.call_count) + self.assertEqual( + ds.data_access_method, DataSourceVMware.DATA_ACCESS_METHOD_ENVVAR + ) + return ds + + def assert_metadata(self, metadata, m_fn, m_fn_call_count=6): + ds = self.assert_get_data_ok(m_fn, m_fn_call_count) + assert_metadata(self, ds, metadata) + + @mock.patch( + "cloudinit.sources.DataSourceVMware.guestinfo_envvar_get_value" + ) + def test_get_subplatform(self, m_fn): + m_fn.side_effect = [VMW_METADATA_YAML, "", "", "", "", ""] + ds = self.assert_get_data_ok(m_fn, m_fn_call_count=4) + self.assertEqual( + ds.subplatform, + "%s (%s)" + % ( + DataSourceVMware.DATA_ACCESS_METHOD_ENVVAR, + DataSourceVMware.get_guestinfo_envvar_key_name("metadata"), + ), + ) + + @mock.patch( + "cloudinit.sources.DataSourceVMware.guestinfo_envvar_get_value" + ) + def test_get_data_metadata_only(self, m_fn): + m_fn.side_effect = [VMW_METADATA_YAML, "", "", "", "", ""] + self.assert_get_data_ok(m_fn, m_fn_call_count=4) + + @mock.patch( + "cloudinit.sources.DataSourceVMware.guestinfo_envvar_get_value" + ) + def test_get_data_userdata_only(self, m_fn): + m_fn.side_effect = ["", VMW_USERDATA_YAML, "", ""] + self.assert_get_data_ok(m_fn, m_fn_call_count=4) + + @mock.patch( + "cloudinit.sources.DataSourceVMware.guestinfo_envvar_get_value" + ) + def test_get_data_vendordata_only(self, m_fn): + m_fn.side_effect = ["", "", VMW_VENDORDATA_YAML, ""] + self.assert_get_data_ok(m_fn, m_fn_call_count=4) + + @mock.patch( + "cloudinit.sources.DataSourceVMware.guestinfo_envvar_get_value" + ) + def test_get_data_metadata_base64(self, m_fn): + data = base64.b64encode(VMW_METADATA_YAML.encode("utf-8")) + m_fn.side_effect = [data, "base64", "", ""] + self.assert_get_data_ok(m_fn, m_fn_call_count=4) + + @mock.patch( + "cloudinit.sources.DataSourceVMware.guestinfo_envvar_get_value" + ) + def test_get_data_metadata_b64(self, m_fn): + data = base64.b64encode(VMW_METADATA_YAML.encode("utf-8")) + m_fn.side_effect = [data, "b64", "", ""] + self.assert_get_data_ok(m_fn, m_fn_call_count=4) + + @mock.patch( + "cloudinit.sources.DataSourceVMware.guestinfo_envvar_get_value" + ) + def test_get_data_metadata_gzip_base64(self, m_fn): + data = VMW_METADATA_YAML.encode("utf-8") + data = gzip.compress(data) + data = base64.b64encode(data) + m_fn.side_effect = [data, "gzip+base64", "", ""] + self.assert_get_data_ok(m_fn, m_fn_call_count=4) + + @mock.patch( + "cloudinit.sources.DataSourceVMware.guestinfo_envvar_get_value" + ) + def test_get_data_metadata_gz_b64(self, m_fn): + data = VMW_METADATA_YAML.encode("utf-8") + data = gzip.compress(data) + data = base64.b64encode(data) + m_fn.side_effect = [data, "gz+b64", "", ""] + self.assert_get_data_ok(m_fn, m_fn_call_count=4) + + @mock.patch( + "cloudinit.sources.DataSourceVMware.guestinfo_envvar_get_value" + ) + def test_metadata_single_ssh_key(self, m_fn): + metadata = DataSourceVMware.load_json_or_yaml(VMW_METADATA_YAML) + metadata["public_keys"] = VMW_SINGLE_KEY + metadata_yaml = safeyaml.dumps(metadata) + m_fn.side_effect = [metadata_yaml, "", "", ""] + self.assert_metadata(metadata, m_fn, m_fn_call_count=4) + + @mock.patch( + "cloudinit.sources.DataSourceVMware.guestinfo_envvar_get_value" + ) + def test_metadata_multiple_ssh_keys(self, m_fn): + metadata = DataSourceVMware.load_json_or_yaml(VMW_METADATA_YAML) + metadata["public_keys"] = VMW_MULTIPLE_KEYS + metadata_yaml = safeyaml.dumps(metadata) + m_fn.side_effect = [metadata_yaml, "", "", ""] + self.assert_metadata(metadata, m_fn, m_fn_call_count=4) + + +class TestDataSourceVMwareGuestInfo(FilesystemMockingTestCase): + """ + Test the guestinfo transport on a VMware platform. + """ + + def setUp(self): + super(TestDataSourceVMwareGuestInfo, self).setUp() + self.tmp = self.tmp_dir() + self.create_system_files() + + def create_system_files(self): + rootd = self.tmp_dir() + populate_dir( + rootd, + { + DataSourceVMware.PRODUCT_UUID_FILE_PATH: PRODUCT_UUID, + PRODUCT_NAME_FILE_PATH: PRODUCT_NAME, + }, + ) + self.assertTrue(self.reRoot(rootd)) + + def assert_get_data_ok(self, m_fn, m_fn_call_count=6): + ds = get_ds(self.tmp) + ds.vmware_rpctool = "vmware-rpctool" + ret = ds.get_data() + self.assertTrue(ret) + self.assertEqual(m_fn_call_count, m_fn.call_count) + self.assertEqual( + ds.data_access_method, + DataSourceVMware.DATA_ACCESS_METHOD_GUESTINFO, + ) + return ds + + def assert_metadata(self, metadata, m_fn, m_fn_call_count=6): + ds = self.assert_get_data_ok(m_fn, m_fn_call_count) + assert_metadata(self, ds, metadata) + + def test_ds_valid_on_vmware_platform(self): + system_type = dmi.read_dmi_data("system-product-name") + self.assertEqual(system_type, PRODUCT_NAME) + + @mock.patch("cloudinit.sources.DataSourceVMware.guestinfo_get_value") + def test_get_subplatform(self, m_fn): + m_fn.side_effect = [VMW_METADATA_YAML, "", "", "", "", ""] + ds = self.assert_get_data_ok(m_fn, m_fn_call_count=4) + self.assertEqual( + ds.subplatform, + "%s (%s)" + % ( + DataSourceVMware.DATA_ACCESS_METHOD_GUESTINFO, + DataSourceVMware.get_guestinfo_key_name("metadata"), + ), + ) + + @mock.patch("cloudinit.sources.DataSourceVMware.guestinfo_get_value") + def test_get_data_userdata_only(self, m_fn): + m_fn.side_effect = ["", VMW_USERDATA_YAML, "", ""] + self.assert_get_data_ok(m_fn, m_fn_call_count=4) + + @mock.patch("cloudinit.sources.DataSourceVMware.guestinfo_get_value") + def test_get_data_vendordata_only(self, m_fn): + m_fn.side_effect = ["", "", VMW_VENDORDATA_YAML, ""] + self.assert_get_data_ok(m_fn, m_fn_call_count=4) + + @mock.patch("cloudinit.sources.DataSourceVMware.guestinfo_get_value") + def test_metadata_single_ssh_key(self, m_fn): + metadata = DataSourceVMware.load_json_or_yaml(VMW_METADATA_YAML) + metadata["public_keys"] = VMW_SINGLE_KEY + metadata_yaml = safeyaml.dumps(metadata) + m_fn.side_effect = [metadata_yaml, "", "", ""] + self.assert_metadata(metadata, m_fn, m_fn_call_count=4) + + @mock.patch("cloudinit.sources.DataSourceVMware.guestinfo_get_value") + def test_metadata_multiple_ssh_keys(self, m_fn): + metadata = DataSourceVMware.load_json_or_yaml(VMW_METADATA_YAML) + metadata["public_keys"] = VMW_MULTIPLE_KEYS + metadata_yaml = safeyaml.dumps(metadata) + m_fn.side_effect = [metadata_yaml, "", "", ""] + self.assert_metadata(metadata, m_fn, m_fn_call_count=4) + + @mock.patch("cloudinit.sources.DataSourceVMware.guestinfo_get_value") + def test_get_data_metadata_base64(self, m_fn): + data = base64.b64encode(VMW_METADATA_YAML.encode("utf-8")) + m_fn.side_effect = [data, "base64", "", ""] + self.assert_get_data_ok(m_fn, m_fn_call_count=4) + + @mock.patch("cloudinit.sources.DataSourceVMware.guestinfo_get_value") + def test_get_data_metadata_b64(self, m_fn): + data = base64.b64encode(VMW_METADATA_YAML.encode("utf-8")) + m_fn.side_effect = [data, "b64", "", ""] + self.assert_get_data_ok(m_fn, m_fn_call_count=4) + + @mock.patch("cloudinit.sources.DataSourceVMware.guestinfo_get_value") + def test_get_data_metadata_gzip_base64(self, m_fn): + data = VMW_METADATA_YAML.encode("utf-8") + data = gzip.compress(data) + data = base64.b64encode(data) + m_fn.side_effect = [data, "gzip+base64", "", ""] + self.assert_get_data_ok(m_fn, m_fn_call_count=4) + + @mock.patch("cloudinit.sources.DataSourceVMware.guestinfo_get_value") + def test_get_data_metadata_gz_b64(self, m_fn): + data = VMW_METADATA_YAML.encode("utf-8") + data = gzip.compress(data) + data = base64.b64encode(data) + m_fn.side_effect = [data, "gz+b64", "", ""] + self.assert_get_data_ok(m_fn, m_fn_call_count=4) + + +class TestDataSourceVMwareGuestInfo_InvalidPlatform(FilesystemMockingTestCase): + """ + Test the guestinfo transport on a non-VMware platform. + """ + + def setUp(self): + super(TestDataSourceVMwareGuestInfo_InvalidPlatform, self).setUp() + self.tmp = self.tmp_dir() + self.create_system_files() + + def create_system_files(self): + rootd = self.tmp_dir() + populate_dir( + rootd, + { + DataSourceVMware.PRODUCT_UUID_FILE_PATH: PRODUCT_UUID, + }, + ) + self.assertTrue(self.reRoot(rootd)) + + @mock.patch("cloudinit.sources.DataSourceVMware.guestinfo_get_value") + def test_ds_invalid_on_non_vmware_platform(self, m_fn): + system_type = dmi.read_dmi_data("system-product-name") + self.assertEqual(system_type, None) + + m_fn.side_effect = [VMW_METADATA_YAML, "", "", "", "", ""] + ds = get_ds(self.tmp) + ds.vmware_rpctool = "vmware-rpctool" + ret = ds.get_data() + self.assertFalse(ret) + + +def assert_metadata(test_obj, ds, metadata): + test_obj.assertEqual(metadata.get("instance-id"), ds.get_instance_id()) + test_obj.assertEqual(metadata.get("local-hostname"), ds.get_hostname()) + + expected_public_keys = metadata.get("public_keys") + if not isinstance(expected_public_keys, list): + expected_public_keys = [expected_public_keys] + + test_obj.assertEqual(expected_public_keys, ds.get_public_ssh_keys()) + test_obj.assertIsInstance(ds.get_public_ssh_keys(), list) + + +def get_ds(temp_dir): + ds = DataSourceVMware.DataSourceVMware( + settings.CFG_BUILTIN, None, helpers.Paths({"run_dir": temp_dir}) + ) + ds.vmware_rpctool = "vmware-rpctool" + return ds + + +# vi: ts=4 expandtab |