# Copyright (C) 2016 Canonical Ltd. # # Author: Scott Moser # # This file is part of cloud-init. See LICENSE file for license information. import base64 import os from collections import OrderedDict from textwrap import dedent from cloudinit import subp from cloudinit import util from cloudinit.tests.helpers import CiTestCase, mock, wrap_and_call from cloudinit.helpers import Paths from cloudinit.sources import DataSourceOVF as dsovf from cloudinit.sources.helpers.vmware.imc.config_custom_script import ( CustomScriptNotFound) MPATH = 'cloudinit.sources.DataSourceOVF.' NOT_FOUND = None OVF_ENV_CONTENT = """ ESX Server 3.0.1 VMware, Inc. en_US {properties} """ def fill_properties(props, template=OVF_ENV_CONTENT): lines = [] prop_tmpl = '' for key, val in props.items(): lines.append(prop_tmpl.format(key=key, val=val)) indent = " " properties = ''.join([indent + line + "\n" for line in lines]) return template.format(properties=properties) class TestReadOvfEnv(CiTestCase): def test_with_b64_userdata(self): user_data = "#!/bin/sh\necho hello world\n" user_data_b64 = base64.b64encode(user_data.encode()).decode() props = {"user-data": user_data_b64, "password": "passw0rd", "instance-id": "inst-001"} env = fill_properties(props) md, ud, cfg = dsovf.read_ovf_environment(env) self.assertEqual({"instance-id": "inst-001"}, md) self.assertEqual(user_data.encode(), ud) self.assertEqual({'password': "passw0rd"}, cfg) def test_with_non_b64_userdata(self): user_data = "my-user-data" props = {"user-data": user_data, "instance-id": "inst-001"} env = fill_properties(props) md, ud, cfg = dsovf.read_ovf_environment(env) self.assertEqual({"instance-id": "inst-001"}, md) self.assertEqual(user_data.encode(), ud) self.assertEqual({}, cfg) def test_with_no_userdata(self): props = {"password": "passw0rd", "instance-id": "inst-001"} env = fill_properties(props) md, ud, cfg = dsovf.read_ovf_environment(env) self.assertEqual({"instance-id": "inst-001"}, md) self.assertEqual({'password': "passw0rd"}, cfg) self.assertIsNone(ud) class TestMarkerFiles(CiTestCase): def setUp(self): super(TestMarkerFiles, self).setUp() self.tdir = self.tmp_dir() def test_false_when_markerid_none(self): """Return False when markerid provided is None.""" self.assertFalse( dsovf.check_marker_exists(markerid=None, marker_dir=self.tdir)) def test_markerid_file_exist(self): """Return False when markerid file path does not exist, True otherwise.""" self.assertFalse( dsovf.check_marker_exists('123', self.tdir)) marker_file = self.tmp_path('.markerfile-123.txt', self.tdir) util.write_file(marker_file, '') self.assertTrue( dsovf.check_marker_exists('123', self.tdir) ) def test_marker_file_setup(self): """Test creation of marker files.""" markerfilepath = self.tmp_path('.markerfile-hi.txt', self.tdir) self.assertFalse(os.path.exists(markerfilepath)) dsovf.setup_marker_files(markerid='hi', marker_dir=self.tdir) self.assertTrue(os.path.exists(markerfilepath)) class TestDatasourceOVF(CiTestCase): with_logs = True def setUp(self): super(TestDatasourceOVF, self).setUp() self.datasource = dsovf.DataSourceOVF self.tdir = self.tmp_dir() def test_get_data_false_on_none_dmi_data(self): """When dmi for system-product-name is None, get_data returns False.""" paths = Paths({'cloud_dir': self.tdir}) ds = self.datasource(sys_cfg={}, distro={}, paths=paths) retcode = wrap_and_call( 'cloudinit.sources.DataSourceOVF', {'util.read_dmi_data': None, 'transport_iso9660': NOT_FOUND, 'transport_vmware_guestinfo': NOT_FOUND}, ds.get_data) self.assertFalse(retcode, 'Expected False return from ds.get_data') self.assertIn( 'DEBUG: No system-product-name found', self.logs.getvalue()) def test_get_data_no_vmware_customization_disabled(self): """When vmware customization is disabled via sys_cfg log a message.""" paths = Paths({'cloud_dir': self.tdir}) ds = self.datasource( sys_cfg={'disable_vmware_customization': True}, distro={}, paths=paths) retcode = wrap_and_call( 'cloudinit.sources.DataSourceOVF', {'util.read_dmi_data': 'vmware', 'transport_iso9660': NOT_FOUND, 'transport_vmware_guestinfo': NOT_FOUND}, ds.get_data) self.assertFalse(retcode, 'Expected False return from ds.get_data') self.assertIn( 'DEBUG: Customization for VMware platform is disabled.', self.logs.getvalue()) def test_get_data_vmware_customization_disabled(self): """When cloud-init workflow for vmware is enabled via sys_cfg log a message. """ paths = Paths({'cloud_dir': self.tdir}) ds = self.datasource( sys_cfg={'disable_vmware_customization': False}, distro={}, paths=paths) conf_file = self.tmp_path('test-cust', self.tdir) conf_content = dedent("""\ [CUSTOM-SCRIPT] SCRIPT-NAME = test-script [MISC] MARKER-ID = 12345345 """) util.write_file(conf_file, conf_content) with mock.patch(MPATH + 'get_tools_config', return_value='true'): with self.assertRaises(CustomScriptNotFound) as context: wrap_and_call( 'cloudinit.sources.DataSourceOVF', {'util.read_dmi_data': 'vmware', 'util.del_dir': True, 'search_file': self.tdir, 'wait_for_imc_cfg_file': conf_file, 'get_nics_to_enable': ''}, ds.get_data) customscript = self.tmp_path('test-script', self.tdir) self.assertIn('Script %s not found!!' % customscript, str(context.exception)) def test_get_data_cust_script_disabled(self): """If custom script is disabled by VMware tools configuration, raise a RuntimeError. """ paths = Paths({'cloud_dir': self.tdir}) ds = self.datasource( sys_cfg={'disable_vmware_customization': False}, distro={}, paths=paths) # Prepare the conf file conf_file = self.tmp_path('test-cust', self.tdir) conf_content = dedent("""\ [CUSTOM-SCRIPT] SCRIPT-NAME = test-script [MISC] MARKER-ID = 12345346 """) util.write_file(conf_file, conf_content) # Prepare the custom sript customscript = self.tmp_path('test-script', self.tdir) util.write_file(customscript, "This is the post cust script") with mock.patch(MPATH + 'get_tools_config', return_value='invalid'): with mock.patch(MPATH + 'set_customization_status', return_value=('msg', b'')): with self.assertRaises(RuntimeError) as context: wrap_and_call( 'cloudinit.sources.DataSourceOVF', {'util.read_dmi_data': 'vmware', 'util.del_dir': True, 'search_file': self.tdir, 'wait_for_imc_cfg_file': conf_file, 'get_nics_to_enable': ''}, ds.get_data) self.assertIn('Custom script is disabled by VM Administrator', str(context.exception)) def test_get_data_non_vmware_seed_platform_info(self): """Platform info properly reports when on non-vmware platforms.""" paths = Paths({'cloud_dir': self.tdir, 'run_dir': self.tdir}) # Write ovf-env.xml seed file seed_dir = self.tmp_path('seed', dir=self.tdir) ovf_env = self.tmp_path('ovf-env.xml', dir=seed_dir) util.write_file(ovf_env, OVF_ENV_CONTENT) ds = self.datasource(sys_cfg={}, distro={}, paths=paths) self.assertEqual('ovf', ds.cloud_name) self.assertEqual('ovf', ds.platform_type) with mock.patch(MPATH + 'util.read_dmi_data', return_value='!VMware'): with mock.patch(MPATH + 'transport_vmware_guestinfo') as m_guestd: with mock.patch(MPATH + 'transport_iso9660') as m_iso9660: m_iso9660.return_value = NOT_FOUND m_guestd.return_value = NOT_FOUND self.assertTrue(ds.get_data()) self.assertEqual( 'ovf (%s/seed/ovf-env.xml)' % self.tdir, ds.subplatform) def test_get_data_vmware_seed_platform_info(self): """Platform info properly reports when on VMware platform.""" paths = Paths({'cloud_dir': self.tdir, 'run_dir': self.tdir}) # Write ovf-env.xml seed file seed_dir = self.tmp_path('seed', dir=self.tdir) ovf_env = self.tmp_path('ovf-env.xml', dir=seed_dir) util.write_file(ovf_env, OVF_ENV_CONTENT) ds = self.datasource(sys_cfg={}, distro={}, paths=paths) self.assertEqual('ovf', ds.cloud_name) self.assertEqual('ovf', ds.platform_type) with mock.patch(MPATH + 'util.read_dmi_data', return_value='VMWare'): with mock.patch(MPATH + 'transport_vmware_guestinfo') as m_guestd: with mock.patch(MPATH + 'transport_iso9660') as m_iso9660: m_iso9660.return_value = NOT_FOUND m_guestd.return_value = NOT_FOUND self.assertTrue(ds.get_data()) self.assertEqual( 'vmware (%s/seed/ovf-env.xml)' % self.tdir, ds.subplatform) class TestTransportIso9660(CiTestCase): def setUp(self): super(TestTransportIso9660, self).setUp() self.add_patch('cloudinit.util.find_devs_with', 'm_find_devs_with') self.add_patch('cloudinit.util.mounts', 'm_mounts') self.add_patch('cloudinit.util.mount_cb', 'm_mount_cb') self.add_patch('cloudinit.sources.DataSourceOVF.get_ovf_env', 'm_get_ovf_env') self.m_get_ovf_env.return_value = ('myfile', 'mycontent') def test_find_already_mounted(self): """Check we call get_ovf_env from on matching mounted devices""" mounts = { '/dev/sr9': { 'fstype': 'iso9660', 'mountpoint': 'wark/media/sr9', 'opts': 'ro', } } self.m_mounts.return_value = mounts self.assertEqual("mycontent", dsovf.transport_iso9660()) def test_find_already_mounted_skips_non_iso9660(self): """Check we call get_ovf_env ignoring non iso9660""" mounts = { '/dev/xvdb': { 'fstype': 'vfat', 'mountpoint': 'wark/foobar', 'opts': 'defaults,noatime', }, '/dev/xvdc': { 'fstype': 'iso9660', 'mountpoint': 'wark/media/sr9', 'opts': 'ro', } } # We use an OrderedDict here to ensure we check xvdb before xvdc # as we're not mocking the regex matching, however, if we place # an entry in the results then we can be reasonably sure that # we're skipping an entry which fails to match. self.m_mounts.return_value = ( OrderedDict(sorted(mounts.items(), key=lambda t: t[0]))) self.assertEqual("mycontent", dsovf.transport_iso9660()) def test_find_already_mounted_matches_kname(self): """Check we dont regex match on basename of the device""" mounts = { '/dev/foo/bar/xvdc': { 'fstype': 'iso9660', 'mountpoint': 'wark/media/sr9', 'opts': 'ro', } } # we're skipping an entry which fails to match. self.m_mounts.return_value = mounts self.assertEqual(NOT_FOUND, dsovf.transport_iso9660()) def test_mount_cb_called_on_blkdevs_with_iso9660(self): """Check we call mount_cb on blockdevs with iso9660 only""" self.m_mounts.return_value = {} self.m_find_devs_with.return_value = ['/dev/sr0'] self.m_mount_cb.return_value = ("myfile", "mycontent") self.assertEqual("mycontent", dsovf.transport_iso9660()) self.m_mount_cb.assert_called_with( "/dev/sr0", dsovf.get_ovf_env, mtype="iso9660") def test_mount_cb_called_on_blkdevs_with_iso9660_check_regex(self): """Check we call mount_cb on blockdevs with iso9660 and match regex""" self.m_mounts.return_value = {} self.m_find_devs_with.return_value = [ '/dev/abc', '/dev/my-cdrom', '/dev/sr0'] self.m_mount_cb.return_value = ("myfile", "mycontent") self.assertEqual("mycontent", dsovf.transport_iso9660()) self.m_mount_cb.assert_called_with( "/dev/sr0", dsovf.get_ovf_env, mtype="iso9660") def test_mount_cb_not_called_no_matches(self): """Check we don't call mount_cb if nothing matches""" self.m_mounts.return_value = {} self.m_find_devs_with.return_value = ['/dev/vg/myovf'] self.assertEqual(NOT_FOUND, dsovf.transport_iso9660()) self.assertEqual(0, self.m_mount_cb.call_count) def test_mount_cb_called_require_iso_false(self): """Check we call mount_cb on blockdevs with require_iso=False""" self.m_mounts.return_value = {} self.m_find_devs_with.return_value = ['/dev/xvdz'] self.m_mount_cb.return_value = ("myfile", "mycontent") self.assertEqual( "mycontent", dsovf.transport_iso9660(require_iso=False)) self.m_mount_cb.assert_called_with( "/dev/xvdz", dsovf.get_ovf_env, mtype=None) def test_maybe_cdrom_device_none(self): """Test maybe_cdrom_device returns False for none/empty input""" self.assertFalse(dsovf.maybe_cdrom_device(None)) self.assertFalse(dsovf.maybe_cdrom_device('')) def test_maybe_cdrom_device_non_string_exception(self): """Test maybe_cdrom_device raises ValueError on non-string types""" with self.assertRaises(ValueError): dsovf.maybe_cdrom_device({'a': 'eleven'}) def test_maybe_cdrom_device_false_on_multi_dir_paths(self): """Test maybe_cdrom_device is false on /dev[/.*]/* paths""" self.assertFalse(dsovf.maybe_cdrom_device('/dev/foo/sr0')) self.assertFalse(dsovf.maybe_cdrom_device('foo/sr0')) self.assertFalse(dsovf.maybe_cdrom_device('../foo/sr0')) self.assertFalse(dsovf.maybe_cdrom_device('../foo/sr0')) def test_maybe_cdrom_device_true_on_hd_partitions(self): """Test maybe_cdrom_device is false on /dev/hd[a-z][0-9]+ paths""" self.assertTrue(dsovf.maybe_cdrom_device('/dev/hda1')) self.assertTrue(dsovf.maybe_cdrom_device('hdz9')) def test_maybe_cdrom_device_true_on_valid_relative_paths(self): """Test maybe_cdrom_device normalizes paths""" self.assertTrue(dsovf.maybe_cdrom_device('/dev/wark/../sr9')) self.assertTrue(dsovf.maybe_cdrom_device('///sr0')) self.assertTrue(dsovf.maybe_cdrom_device('/sr0')) self.assertTrue(dsovf.maybe_cdrom_device('//dev//hda')) def test_maybe_cdrom_device_true_on_xvd_partitions(self): """Test maybe_cdrom_device returns true on xvd*""" self.assertTrue(dsovf.maybe_cdrom_device('/dev/xvda')) self.assertTrue(dsovf.maybe_cdrom_device('/dev/xvda1')) self.assertTrue(dsovf.maybe_cdrom_device('xvdza1')) @mock.patch(MPATH + "subp.which") @mock.patch(MPATH + "subp.subp") class TestTransportVmwareGuestinfo(CiTestCase): """Test the com.vmware.guestInfo transport implemented in transport_vmware_guestinfo.""" rpctool = 'vmware-rpctool' with_logs = True rpctool_path = '/not/important/vmware-rpctool' def test_without_vmware_rpctool_returns_notfound(self, m_subp, m_which): m_which.return_value = None self.assertEqual(NOT_FOUND, dsovf.transport_vmware_guestinfo()) self.assertEqual(0, m_subp.call_count, "subp should not be called if no rpctool in path.") def test_notfound_on_exit_code_1(self, m_subp, m_which): """If vmware-rpctool exits 1, then must return not found.""" m_which.return_value = self.rpctool_path m_subp.side_effect = subp.ProcessExecutionError( stdout="", stderr="No value found", exit_code=1, cmd=["unused"]) self.assertEqual(NOT_FOUND, dsovf.transport_vmware_guestinfo()) self.assertEqual(1, m_subp.call_count) self.assertNotIn("WARNING", self.logs.getvalue(), "exit code of 1 by rpctool should not cause warning.") def test_notfound_if_no_content_but_exit_zero(self, m_subp, m_which): """If vmware-rpctool exited 0 with no stdout is normal not-found. This isn't actually a case I've seen. normally on "not found", rpctool would exit 1 with 'No value found' on stderr. But cover the case where it exited 0 and just wrote nothing to stdout. """ m_which.return_value = self.rpctool_path m_subp.return_value = ('', '') self.assertEqual(NOT_FOUND, dsovf.transport_vmware_guestinfo()) self.assertEqual(1, m_subp.call_count) def test_notfound_and_warns_on_unexpected_exit_code(self, m_subp, m_which): """If vmware-rpctool exits non zero or 1, warnings should be logged.""" m_which.return_value = self.rpctool_path m_subp.side_effect = subp.ProcessExecutionError( stdout=None, stderr="No value found", exit_code=2, cmd=["unused"]) self.assertEqual(NOT_FOUND, dsovf.transport_vmware_guestinfo()) self.assertEqual(1, m_subp.call_count) self.assertIn("WARNING", self.logs.getvalue(), "exit code of 2 by rpctool should log WARNING.") def test_found_when_guestinfo_present(self, m_subp, m_which): """When there is a ovf info, transport should return it.""" m_which.return_value = self.rpctool_path content = fill_properties({}) m_subp.return_value = (content, '') self.assertEqual(content, dsovf.transport_vmware_guestinfo()) self.assertEqual(1, m_subp.call_count) # # vi: ts=4 expandtab