From 728b370b68ba8a879ce1653bdf9a572cf007e0d7 Mon Sep 17 00:00:00 2001 From: Scott Moser Date: Tue, 11 Apr 2017 14:05:42 -0400 Subject: tests: fix AltCloud tests to not rely on blkid This just mocks out the AltCloud tests to not invoke blkid. Our tests should not rely on system command returning any specific value. Also, shorten long lines with change in the import name of DataSourceAltCloud. LP: #1636531 --- tests/unittests/test_datasource/test_altcloud.py | 123 +++++++++++------------ 1 file changed, 59 insertions(+), 64 deletions(-) (limited to 'tests/unittests/test_datasource') diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py index b0ad86ab..63a2b04d 100644 --- a/tests/unittests/test_datasource/test_altcloud.py +++ b/tests/unittests/test_datasource/test_altcloud.py @@ -10,6 +10,7 @@ This test file exercises the code in sources DataSourceAltCloud.py ''' +import mock import os import shutil import tempfile @@ -18,10 +19,7 @@ from cloudinit import helpers from cloudinit import util from unittest import TestCase -# Get the cloudinit.sources.DataSourceAltCloud import items needed. -import cloudinit.sources.DataSourceAltCloud -from cloudinit.sources.DataSourceAltCloud import DataSourceAltCloud -from cloudinit.sources.DataSourceAltCloud import read_user_data_callback +import cloudinit.sources.DataSourceAltCloud as dsac OS_UNAME_ORIG = getattr(os, 'uname') @@ -32,17 +30,17 @@ def _write_cloud_info_file(value): with a cloud backend identifier ImageFactory when building an image with ImageFactory. ''' - cifile = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w') + cifile = open(dsac.CLOUD_INFO_FILE, 'w') cifile.write(value) cifile.close() - os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0o664) + os.chmod(dsac.CLOUD_INFO_FILE, 0o664) def _remove_cloud_info_file(): ''' Remove the test CLOUD_INFO_FILE ''' - os.remove(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE) + os.remove(dsac.CLOUD_INFO_FILE) def _write_user_data_files(mount_dir, value): @@ -122,7 +120,7 @@ class TestGetCloudType(TestCase): Forcing read_dmi_data return to match a RHEVm system: RHEV Hypervisor ''' util.read_dmi_data = _dmi_data('RHEV') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual('RHEV', dsrc.get_cloud_type()) def test_vsphere(self): @@ -131,7 +129,7 @@ class TestGetCloudType(TestCase): Forcing read_dmi_data return to match a vSphere system: RHEV Hypervisor ''' util.read_dmi_data = _dmi_data('VMware Virtual Platform') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual('VSPHERE', dsrc.get_cloud_type()) def test_unknown(self): @@ -140,7 +138,7 @@ class TestGetCloudType(TestCase): Forcing read_dmi_data return to match an unrecognized return. ''' util.read_dmi_data = _dmi_data('Unrecognized Platform') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual('UNKNOWN', dsrc.get_cloud_type()) @@ -154,8 +152,7 @@ class TestGetDataCloudInfoFile(TestCase): self.paths = helpers.Paths({'cloud_dir': '/tmp'}) self.cloud_info_file = tempfile.mkstemp()[1] self.dmi_data = util.read_dmi_data - cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ - self.cloud_info_file + dsac.CLOUD_INFO_FILE = self.cloud_info_file def tearDown(self): # Reset @@ -167,14 +164,13 @@ class TestGetDataCloudInfoFile(TestCase): pass util.read_dmi_data = self.dmi_data - cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ - '/etc/sysconfig/cloud-info' + dsac.CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info' def test_rhev(self): '''Success Test module get_data() forcing RHEV.''' _write_cloud_info_file('RHEV') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) dsrc.user_data_rhevm = lambda: True self.assertEqual(True, dsrc.get_data()) @@ -182,7 +178,7 @@ class TestGetDataCloudInfoFile(TestCase): '''Success Test module get_data() forcing VSPHERE.''' _write_cloud_info_file('VSPHERE') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) dsrc.user_data_vsphere = lambda: True self.assertEqual(True, dsrc.get_data()) @@ -190,7 +186,7 @@ class TestGetDataCloudInfoFile(TestCase): '''Failure Test module get_data() forcing RHEV.''' _write_cloud_info_file('RHEV') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) dsrc.user_data_rhevm = lambda: False self.assertEqual(False, dsrc.get_data()) @@ -198,7 +194,7 @@ class TestGetDataCloudInfoFile(TestCase): '''Failure Test module get_data() forcing VSPHERE.''' _write_cloud_info_file('VSPHERE') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) dsrc.user_data_vsphere = lambda: False self.assertEqual(False, dsrc.get_data()) @@ -206,7 +202,7 @@ class TestGetDataCloudInfoFile(TestCase): '''Failure Test module get_data() forcing unrecognized.''' _write_cloud_info_file('unrecognized') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.get_data()) @@ -219,7 +215,7 @@ class TestGetDataNoCloudInfoFile(TestCase): '''Set up.''' self.paths = helpers.Paths({'cloud_dir': '/tmp'}) self.dmi_data = util.read_dmi_data - cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + dsac.CLOUD_INFO_FILE = \ 'no such file' # We have a different code path for arm to deal with LP1243287 # We have to switch arch to x86_64 to avoid test failure @@ -227,7 +223,7 @@ class TestGetDataNoCloudInfoFile(TestCase): def tearDown(self): # Reset - cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + dsac.CLOUD_INFO_FILE = \ '/etc/sysconfig/cloud-info' util.read_dmi_data = self.dmi_data # Return back to original arch @@ -237,7 +233,7 @@ class TestGetDataNoCloudInfoFile(TestCase): '''Test No cloud info file module get_data() forcing RHEV.''' util.read_dmi_data = _dmi_data('RHEV Hypervisor') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) dsrc.user_data_rhevm = lambda: True self.assertEqual(True, dsrc.get_data()) @@ -245,7 +241,7 @@ class TestGetDataNoCloudInfoFile(TestCase): '''Test No cloud info file module get_data() forcing VSPHERE.''' util.read_dmi_data = _dmi_data('VMware Virtual Platform') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) dsrc.user_data_vsphere = lambda: True self.assertEqual(True, dsrc.get_data()) @@ -253,7 +249,7 @@ class TestGetDataNoCloudInfoFile(TestCase): '''Test No cloud info file module get_data() forcing unrecognized.''' util.read_dmi_data = _dmi_data('Unrecognized Platform') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.get_data()) @@ -261,11 +257,14 @@ class TestUserDataRhevm(TestCase): ''' Test to exercise method: DataSourceAltCloud.user_data_rhevm() ''' + cmd_pass = ['true'] + cmd_fail = ['false'] + cmd_not_found = ['bogus bad command'] + def setUp(self): '''Set up.''' self.paths = helpers.Paths({'cloud_dir': '/tmp'}) self.mount_dir = tempfile.mkdtemp() - _write_user_data_files(self.mount_dir, 'test user data') def tearDown(self): @@ -279,61 +278,44 @@ class TestUserDataRhevm(TestCase): except OSError: pass - cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ - '/etc/sysconfig/cloud-info' - cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ - ['/sbin/modprobe', 'floppy'] - cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \ - ['/sbin/udevadm', 'settle', '--quiet', '--timeout=5'] + dsac.CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info' + dsac.CMD_PROBE_FLOPPY = ['/sbin/modprobe', 'floppy'] + dsac.CMD_UDEVADM_SETTLE = ['/sbin/udevadm', 'settle', + '--quiet', '--timeout=5'] def test_mount_cb_fails(self): '''Test user_data_rhevm() where mount_cb fails.''' - cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ - ['echo', 'modprobe floppy'] - - dsrc = DataSourceAltCloud({}, None, self.paths) - + dsac.CMD_PROBE_FLOPPY = self.cmd_pass + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.user_data_rhevm()) def test_modprobe_fails(self): '''Test user_data_rhevm() where modprobe fails.''' - cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ - ['ls', 'modprobe floppy'] - - dsrc = DataSourceAltCloud({}, None, self.paths) - + dsac.CMD_PROBE_FLOPPY = self.cmd_fail + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.user_data_rhevm()) def test_no_modprobe_cmd(self): '''Test user_data_rhevm() with no modprobe command.''' - cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ - ['bad command', 'modprobe floppy'] - - dsrc = DataSourceAltCloud({}, None, self.paths) - + dsac.CMD_PROBE_FLOPPY = self.cmd_not_found + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.user_data_rhevm()) def test_udevadm_fails(self): '''Test user_data_rhevm() where udevadm fails.''' - cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \ - ['ls', 'udevadm floppy'] - - dsrc = DataSourceAltCloud({}, None, self.paths) - + dsac.CMD_UDEVADM_SETTLE = self.cmd_fail + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.user_data_rhevm()) def test_no_udevadm_cmd(self): '''Test user_data_rhevm() with no udevadm command.''' - cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \ - ['bad command', 'udevadm floppy'] - - dsrc = DataSourceAltCloud({}, None, self.paths) - + dsac.CMD_UDEVADM_SETTLE = self.cmd_not_found + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.user_data_rhevm()) @@ -359,17 +341,30 @@ class TestUserDataVsphere(TestCase): except OSError: pass - cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + dsac.CLOUD_INFO_FILE = \ '/etc/sysconfig/cloud-info' - def test_user_data_vsphere(self): + @mock.patch("cloudinit.sources.DataSourceAltCloud.util.find_devs_with") + @mock.patch("cloudinit.sources.DataSourceAltCloud.util.mount_cb") + def test_user_data_vsphere_no_cdrom(self, m_mount_cb, m_find_devs_with): '''Test user_data_vsphere() where mount_cb fails.''' - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = self.mount_dir + m_mount_cb.return_value = [] + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + self.assertEqual(False, dsrc.user_data_vsphere()) + self.assertEqual(0, m_mount_cb.call_count) - dsrc = DataSourceAltCloud({}, None, self.paths) + @mock.patch("cloudinit.sources.DataSourceAltCloud.util.find_devs_with") + @mock.patch("cloudinit.sources.DataSourceAltCloud.util.mount_cb") + def test_user_data_vsphere_mcb_fail(self, m_mount_cb, m_find_devs_with): + '''Test user_data_vsphere() where mount_cb fails.''' + m_find_devs_with.return_value = ["/dev/mock/cdrom"] + m_mount_cb.side_effect = util.MountFailedError("Unable To mount") + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.user_data_vsphere()) + self.assertEqual(1, m_find_devs_with.call_count) + self.assertEqual(1, m_mount_cb.call_count) class TestReadUserDataCallback(TestCase): @@ -398,7 +393,7 @@ class TestReadUserDataCallback(TestCase): '''Test read_user_data_callback() with both files.''' self.assertEqual('test user data', - read_user_data_callback(self.mount_dir)) + dsac.read_user_data_callback(self.mount_dir)) def test_callback_dc(self): '''Test read_user_data_callback() with only DC file.''' @@ -408,7 +403,7 @@ class TestReadUserDataCallback(TestCase): non_dc_file=True) self.assertEqual('test user data', - read_user_data_callback(self.mount_dir)) + dsac.read_user_data_callback(self.mount_dir)) def test_callback_non_dc(self): '''Test read_user_data_callback() with only non-DC file.''' @@ -418,13 +413,13 @@ class TestReadUserDataCallback(TestCase): non_dc_file=False) self.assertEqual('test user data', - read_user_data_callback(self.mount_dir)) + dsac.read_user_data_callback(self.mount_dir)) def test_callback_none(self): '''Test read_user_data_callback() no files are found.''' _remove_user_data_files(self.mount_dir) - self.assertEqual(None, read_user_data_callback(self.mount_dir)) + self.assertEqual(None, dsac.read_user_data_callback(self.mount_dir)) def force_arch(arch=None): -- cgit v1.2.3 From 493f6c3e923902d5d4f3d87e1cc4c726ea90ada4 Mon Sep 17 00:00:00 2001 From: Ben Howard Date: Tue, 11 Apr 2017 10:00:12 -0600 Subject: DigitalOcean: bind resolvers to loopback interface. This change makes the DigitalOcean datasource consistent with OpenStack and Joyent by binding the resolver addresses to the loopback interface. This _is_ a work-around to bug 1675571. Part of bug 1676908. --- cloudinit/sources/helpers/digitalocean.py | 18 ++++----- .../unittests/test_datasource/test_digitalocean.py | 46 +++++++++++++--------- 2 files changed, 34 insertions(+), 30 deletions(-) (limited to 'tests/unittests/test_datasource') diff --git a/cloudinit/sources/helpers/digitalocean.py b/cloudinit/sources/helpers/digitalocean.py index 72f7bde4..6423c8ef 100644 --- a/cloudinit/sources/helpers/digitalocean.py +++ b/cloudinit/sources/helpers/digitalocean.py @@ -107,15 +107,12 @@ def convert_network_configuration(config, dns_servers): } """ - def _get_subnet_part(pcfg, nameservers=None): + def _get_subnet_part(pcfg): subpart = {'type': 'static', 'control': 'auto', 'address': pcfg.get('ip_address'), 'gateway': pcfg.get('gateway')} - if nameservers: - subpart['dns_nameservers'] = nameservers - if ":" in pcfg.get('ip_address'): subpart['address'] = "{0}/{1}".format(pcfg.get('ip_address'), pcfg.get('cidr')) @@ -157,13 +154,8 @@ def convert_network_configuration(config, dns_servers): continue sub_part = _get_subnet_part(raw_subnet) - if nic_type == 'public' and 'anchor' not in netdef: - # add DNS resolvers to the public interfaces only - sub_part = _get_subnet_part(raw_subnet, dns_servers) - else: - # remove the gateway any non-public interfaces - if 'gateway' in sub_part: - del sub_part['gateway'] + if netdef in ('private', 'anchor_ipv4', 'anchor_ipv6'): + del sub_part['gateway'] subnets.append(sub_part) @@ -171,6 +163,10 @@ def convert_network_configuration(config, dns_servers): nic_configs.append(ncfg) LOG.debug("nic '%s' configuration: %s", if_name, ncfg) + if dns_servers: + LOG.debug("added dns servers: %s", dns_servers) + nic_configs.append({'type': 'nameserver', 'address': dns_servers}) + return {'version': 1, 'config': nic_configs} diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py index 61d6e001..a11166a9 100644 --- a/tests/unittests/test_datasource/test_digitalocean.py +++ b/tests/unittests/test_datasource/test_digitalocean.py @@ -197,7 +197,8 @@ class TestNetworkConvert(TestCase): @mock.patch('cloudinit.net.get_interfaces_by_mac') def _get_networking(self, m_get_by_mac): m_get_by_mac.return_value = { - '04:01:57:d1:9e:01': 'ens1', '04:01:57:d1:9e:02': 'ens2', + '04:01:57:d1:9e:01': 'ens1', + '04:01:57:d1:9e:02': 'ens2', 'b8:ae:ed:75:5f:9a': 'enp0s25', 'ae:cc:08:7c:88:00': 'meta2p1'} netcfg = digitalocean.convert_network_configuration( @@ -208,18 +209,33 @@ class TestNetworkConvert(TestCase): def test_networking_defined(self): netcfg = self._get_networking() self.assertIsNotNone(netcfg) + dns_defined = False - for nic_def in netcfg.get('config'): - print(json.dumps(nic_def, indent=3)) - n_type = nic_def.get('type') - n_subnets = nic_def.get('type') - n_name = nic_def.get('name') - n_mac = nic_def.get('mac_address') + for part in netcfg.get('config'): + n_type = part.get('type') + print("testing part ", n_type, "\n", json.dumps(part, indent=3)) + + if n_type == 'nameserver': + n_address = part.get('address') + self.assertIsNotNone(n_address) + self.assertEqual(len(n_address), 3) + + dns_resolvers = DO_META["dns"]["nameservers"] + for x in n_address: + self.assertIn(x, dns_resolvers) + dns_defined = True - self.assertIsNotNone(n_type) - self.assertIsNotNone(n_subnets) - self.assertIsNotNone(n_name) - self.assertIsNotNone(n_mac) + else: + n_subnets = part.get('type') + n_name = part.get('name') + n_mac = part.get('mac_address') + + self.assertIsNotNone(n_type) + self.assertIsNotNone(n_subnets) + self.assertIsNotNone(n_name) + self.assertIsNotNone(n_mac) + + self.assertTrue(dns_defined) def _get_nic_definition(self, int_type, expected_name): """helper function to return if_type (i.e. public) and the expected @@ -260,12 +276,6 @@ class TestNetworkConvert(TestCase): self.assertEqual(meta_def.get('mac'), nic_def.get('mac_address')) self.assertEqual('physical', nic_def.get('type')) - def _check_dns_nameservers(self, subn_def): - self.assertIn('dns_nameservers', subn_def) - expected_nameservers = DO_META['dns']['nameservers'] - nic_nameservers = subn_def.get('dns_nameservers') - self.assertEqual(expected_nameservers, nic_nameservers) - def test_public_interface_ipv6(self): """test public ipv6 addressing""" (nic_def, meta_def) = self._get_nic_definition('public', 'eth0') @@ -280,7 +290,6 @@ class TestNetworkConvert(TestCase): self.assertEqual(cidr_notated_address, subn_def.get('address')) self.assertEqual(ipv6_def.get('gateway'), subn_def.get('gateway')) - self._check_dns_nameservers(subn_def) def test_public_interface_ipv4(self): """test public ipv4 addressing""" @@ -293,7 +302,6 @@ class TestNetworkConvert(TestCase): self.assertEqual(ipv4_def.get('netmask'), subn_def.get('netmask')) self.assertEqual(ipv4_def.get('gateway'), subn_def.get('gateway')) - self._check_dns_nameservers(subn_def) def test_public_interface_anchor_ipv4(self): """test public ipv4 addressing""" -- cgit v1.2.3