summaryrefslogtreecommitdiff
path: root/tests/unittests
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests')
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py123
-rw-r--r--tests/unittests/test_datasource/test_digitalocean.py46
-rw-r--r--tests/unittests/test_handler/test_handler_yum_add_repo.py56
-rw-r--r--tests/unittests/test_net.py30
-rw-r--r--tests/unittests/test_util.py33
5 files changed, 194 insertions, 94 deletions
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
index b0ad86ab..63a2b04d 100644
--- a/tests/unittests/test_datasource/test_altcloud.py
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -10,6 +10,7 @@
This test file exercises the code in sources DataSourceAltCloud.py
'''
+import mock
import os
import shutil
import tempfile
@@ -18,10 +19,7 @@ from cloudinit import helpers
from cloudinit import util
from unittest import TestCase
-# Get the cloudinit.sources.DataSourceAltCloud import items needed.
-import cloudinit.sources.DataSourceAltCloud
-from cloudinit.sources.DataSourceAltCloud import DataSourceAltCloud
-from cloudinit.sources.DataSourceAltCloud import read_user_data_callback
+import cloudinit.sources.DataSourceAltCloud as dsac
OS_UNAME_ORIG = getattr(os, 'uname')
@@ -32,17 +30,17 @@ def _write_cloud_info_file(value):
with a cloud backend identifier ImageFactory when building
an image with ImageFactory.
'''
- cifile = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w')
+ cifile = open(dsac.CLOUD_INFO_FILE, 'w')
cifile.write(value)
cifile.close()
- os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0o664)
+ os.chmod(dsac.CLOUD_INFO_FILE, 0o664)
def _remove_cloud_info_file():
'''
Remove the test CLOUD_INFO_FILE
'''
- os.remove(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE)
+ os.remove(dsac.CLOUD_INFO_FILE)
def _write_user_data_files(mount_dir, value):
@@ -122,7 +120,7 @@ class TestGetCloudType(TestCase):
Forcing read_dmi_data return to match a RHEVm system: RHEV Hypervisor
'''
util.read_dmi_data = _dmi_data('RHEV')
- dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual('RHEV', dsrc.get_cloud_type())
def test_vsphere(self):
@@ -131,7 +129,7 @@ class TestGetCloudType(TestCase):
Forcing read_dmi_data return to match a vSphere system: RHEV Hypervisor
'''
util.read_dmi_data = _dmi_data('VMware Virtual Platform')
- dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual('VSPHERE', dsrc.get_cloud_type())
def test_unknown(self):
@@ -140,7 +138,7 @@ class TestGetCloudType(TestCase):
Forcing read_dmi_data return to match an unrecognized return.
'''
util.read_dmi_data = _dmi_data('Unrecognized Platform')
- dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual('UNKNOWN', dsrc.get_cloud_type())
@@ -154,8 +152,7 @@ class TestGetDataCloudInfoFile(TestCase):
self.paths = helpers.Paths({'cloud_dir': '/tmp'})
self.cloud_info_file = tempfile.mkstemp()[1]
self.dmi_data = util.read_dmi_data
- cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
- self.cloud_info_file
+ dsac.CLOUD_INFO_FILE = self.cloud_info_file
def tearDown(self):
# Reset
@@ -167,14 +164,13 @@ class TestGetDataCloudInfoFile(TestCase):
pass
util.read_dmi_data = self.dmi_data
- cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
- '/etc/sysconfig/cloud-info'
+ dsac.CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info'
def test_rhev(self):
'''Success Test module get_data() forcing RHEV.'''
_write_cloud_info_file('RHEV')
- dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_rhevm = lambda: True
self.assertEqual(True, dsrc.get_data())
@@ -182,7 +178,7 @@ class TestGetDataCloudInfoFile(TestCase):
'''Success Test module get_data() forcing VSPHERE.'''
_write_cloud_info_file('VSPHERE')
- dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_vsphere = lambda: True
self.assertEqual(True, dsrc.get_data())
@@ -190,7 +186,7 @@ class TestGetDataCloudInfoFile(TestCase):
'''Failure Test module get_data() forcing RHEV.'''
_write_cloud_info_file('RHEV')
- dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_rhevm = lambda: False
self.assertEqual(False, dsrc.get_data())
@@ -198,7 +194,7 @@ class TestGetDataCloudInfoFile(TestCase):
'''Failure Test module get_data() forcing VSPHERE.'''
_write_cloud_info_file('VSPHERE')
- dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_vsphere = lambda: False
self.assertEqual(False, dsrc.get_data())
@@ -206,7 +202,7 @@ class TestGetDataCloudInfoFile(TestCase):
'''Failure Test module get_data() forcing unrecognized.'''
_write_cloud_info_file('unrecognized')
- dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.get_data())
@@ -219,7 +215,7 @@ class TestGetDataNoCloudInfoFile(TestCase):
'''Set up.'''
self.paths = helpers.Paths({'cloud_dir': '/tmp'})
self.dmi_data = util.read_dmi_data
- cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ dsac.CLOUD_INFO_FILE = \
'no such file'
# We have a different code path for arm to deal with LP1243287
# We have to switch arch to x86_64 to avoid test failure
@@ -227,7 +223,7 @@ class TestGetDataNoCloudInfoFile(TestCase):
def tearDown(self):
# Reset
- cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ dsac.CLOUD_INFO_FILE = \
'/etc/sysconfig/cloud-info'
util.read_dmi_data = self.dmi_data
# Return back to original arch
@@ -237,7 +233,7 @@ class TestGetDataNoCloudInfoFile(TestCase):
'''Test No cloud info file module get_data() forcing RHEV.'''
util.read_dmi_data = _dmi_data('RHEV Hypervisor')
- dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_rhevm = lambda: True
self.assertEqual(True, dsrc.get_data())
@@ -245,7 +241,7 @@ class TestGetDataNoCloudInfoFile(TestCase):
'''Test No cloud info file module get_data() forcing VSPHERE.'''
util.read_dmi_data = _dmi_data('VMware Virtual Platform')
- dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_vsphere = lambda: True
self.assertEqual(True, dsrc.get_data())
@@ -253,7 +249,7 @@ class TestGetDataNoCloudInfoFile(TestCase):
'''Test No cloud info file module get_data() forcing unrecognized.'''
util.read_dmi_data = _dmi_data('Unrecognized Platform')
- dsrc = DataSourceAltCloud({}, None, self.paths)
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.get_data())
@@ -261,11 +257,14 @@ class TestUserDataRhevm(TestCase):
'''
Test to exercise method: DataSourceAltCloud.user_data_rhevm()
'''
+ cmd_pass = ['true']
+ cmd_fail = ['false']
+ cmd_not_found = ['bogus bad command']
+
def setUp(self):
'''Set up.'''
self.paths = helpers.Paths({'cloud_dir': '/tmp'})
self.mount_dir = tempfile.mkdtemp()
-
_write_user_data_files(self.mount_dir, 'test user data')
def tearDown(self):
@@ -279,61 +278,44 @@ class TestUserDataRhevm(TestCase):
except OSError:
pass
- cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
- '/etc/sysconfig/cloud-info'
- cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
- ['/sbin/modprobe', 'floppy']
- cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \
- ['/sbin/udevadm', 'settle', '--quiet', '--timeout=5']
+ dsac.CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info'
+ dsac.CMD_PROBE_FLOPPY = ['/sbin/modprobe', 'floppy']
+ dsac.CMD_UDEVADM_SETTLE = ['/sbin/udevadm', 'settle',
+ '--quiet', '--timeout=5']
def test_mount_cb_fails(self):
'''Test user_data_rhevm() where mount_cb fails.'''
- cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
- ['echo', 'modprobe floppy']
-
- dsrc = DataSourceAltCloud({}, None, self.paths)
-
+ dsac.CMD_PROBE_FLOPPY = self.cmd_pass
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_modprobe_fails(self):
'''Test user_data_rhevm() where modprobe fails.'''
- cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
- ['ls', 'modprobe floppy']
-
- dsrc = DataSourceAltCloud({}, None, self.paths)
-
+ dsac.CMD_PROBE_FLOPPY = self.cmd_fail
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_no_modprobe_cmd(self):
'''Test user_data_rhevm() with no modprobe command.'''
- cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
- ['bad command', 'modprobe floppy']
-
- dsrc = DataSourceAltCloud({}, None, self.paths)
-
+ dsac.CMD_PROBE_FLOPPY = self.cmd_not_found
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_udevadm_fails(self):
'''Test user_data_rhevm() where udevadm fails.'''
- cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \
- ['ls', 'udevadm floppy']
-
- dsrc = DataSourceAltCloud({}, None, self.paths)
-
+ dsac.CMD_UDEVADM_SETTLE = self.cmd_fail
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
def test_no_udevadm_cmd(self):
'''Test user_data_rhevm() with no udevadm command.'''
- cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \
- ['bad command', 'udevadm floppy']
-
- dsrc = DataSourceAltCloud({}, None, self.paths)
-
+ dsac.CMD_UDEVADM_SETTLE = self.cmd_not_found
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_rhevm())
@@ -359,17 +341,30 @@ class TestUserDataVsphere(TestCase):
except OSError:
pass
- cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
+ dsac.CLOUD_INFO_FILE = \
'/etc/sysconfig/cloud-info'
- def test_user_data_vsphere(self):
+ @mock.patch("cloudinit.sources.DataSourceAltCloud.util.find_devs_with")
+ @mock.patch("cloudinit.sources.DataSourceAltCloud.util.mount_cb")
+ def test_user_data_vsphere_no_cdrom(self, m_mount_cb, m_find_devs_with):
'''Test user_data_vsphere() where mount_cb fails.'''
- cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = self.mount_dir
+ m_mount_cb.return_value = []
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
+ self.assertEqual(False, dsrc.user_data_vsphere())
+ self.assertEqual(0, m_mount_cb.call_count)
- dsrc = DataSourceAltCloud({}, None, self.paths)
+ @mock.patch("cloudinit.sources.DataSourceAltCloud.util.find_devs_with")
+ @mock.patch("cloudinit.sources.DataSourceAltCloud.util.mount_cb")
+ def test_user_data_vsphere_mcb_fail(self, m_mount_cb, m_find_devs_with):
+ '''Test user_data_vsphere() where mount_cb fails.'''
+ m_find_devs_with.return_value = ["/dev/mock/cdrom"]
+ m_mount_cb.side_effect = util.MountFailedError("Unable To mount")
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
self.assertEqual(False, dsrc.user_data_vsphere())
+ self.assertEqual(1, m_find_devs_with.call_count)
+ self.assertEqual(1, m_mount_cb.call_count)
class TestReadUserDataCallback(TestCase):
@@ -398,7 +393,7 @@ class TestReadUserDataCallback(TestCase):
'''Test read_user_data_callback() with both files.'''
self.assertEqual('test user data',
- read_user_data_callback(self.mount_dir))
+ dsac.read_user_data_callback(self.mount_dir))
def test_callback_dc(self):
'''Test read_user_data_callback() with only DC file.'''
@@ -408,7 +403,7 @@ class TestReadUserDataCallback(TestCase):
non_dc_file=True)
self.assertEqual('test user data',
- read_user_data_callback(self.mount_dir))
+ dsac.read_user_data_callback(self.mount_dir))
def test_callback_non_dc(self):
'''Test read_user_data_callback() with only non-DC file.'''
@@ -418,13 +413,13 @@ class TestReadUserDataCallback(TestCase):
non_dc_file=False)
self.assertEqual('test user data',
- read_user_data_callback(self.mount_dir))
+ dsac.read_user_data_callback(self.mount_dir))
def test_callback_none(self):
'''Test read_user_data_callback() no files are found.'''
_remove_user_data_files(self.mount_dir)
- self.assertEqual(None, read_user_data_callback(self.mount_dir))
+ self.assertEqual(None, dsac.read_user_data_callback(self.mount_dir))
def force_arch(arch=None):
diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py
index 61d6e001..a11166a9 100644
--- a/tests/unittests/test_datasource/test_digitalocean.py
+++ b/tests/unittests/test_datasource/test_digitalocean.py
@@ -197,7 +197,8 @@ class TestNetworkConvert(TestCase):
@mock.patch('cloudinit.net.get_interfaces_by_mac')
def _get_networking(self, m_get_by_mac):
m_get_by_mac.return_value = {
- '04:01:57:d1:9e:01': 'ens1', '04:01:57:d1:9e:02': 'ens2',
+ '04:01:57:d1:9e:01': 'ens1',
+ '04:01:57:d1:9e:02': 'ens2',
'b8:ae:ed:75:5f:9a': 'enp0s25',
'ae:cc:08:7c:88:00': 'meta2p1'}
netcfg = digitalocean.convert_network_configuration(
@@ -208,18 +209,33 @@ class TestNetworkConvert(TestCase):
def test_networking_defined(self):
netcfg = self._get_networking()
self.assertIsNotNone(netcfg)
+ dns_defined = False
- for nic_def in netcfg.get('config'):
- print(json.dumps(nic_def, indent=3))
- n_type = nic_def.get('type')
- n_subnets = nic_def.get('type')
- n_name = nic_def.get('name')
- n_mac = nic_def.get('mac_address')
+ for part in netcfg.get('config'):
+ n_type = part.get('type')
+ print("testing part ", n_type, "\n", json.dumps(part, indent=3))
+
+ if n_type == 'nameserver':
+ n_address = part.get('address')
+ self.assertIsNotNone(n_address)
+ self.assertEqual(len(n_address), 3)
+
+ dns_resolvers = DO_META["dns"]["nameservers"]
+ for x in n_address:
+ self.assertIn(x, dns_resolvers)
+ dns_defined = True
- self.assertIsNotNone(n_type)
- self.assertIsNotNone(n_subnets)
- self.assertIsNotNone(n_name)
- self.assertIsNotNone(n_mac)
+ else:
+ n_subnets = part.get('type')
+ n_name = part.get('name')
+ n_mac = part.get('mac_address')
+
+ self.assertIsNotNone(n_type)
+ self.assertIsNotNone(n_subnets)
+ self.assertIsNotNone(n_name)
+ self.assertIsNotNone(n_mac)
+
+ self.assertTrue(dns_defined)
def _get_nic_definition(self, int_type, expected_name):
"""helper function to return if_type (i.e. public) and the expected
@@ -260,12 +276,6 @@ class TestNetworkConvert(TestCase):
self.assertEqual(meta_def.get('mac'), nic_def.get('mac_address'))
self.assertEqual('physical', nic_def.get('type'))
- def _check_dns_nameservers(self, subn_def):
- self.assertIn('dns_nameservers', subn_def)
- expected_nameservers = DO_META['dns']['nameservers']
- nic_nameservers = subn_def.get('dns_nameservers')
- self.assertEqual(expected_nameservers, nic_nameservers)
-
def test_public_interface_ipv6(self):
"""test public ipv6 addressing"""
(nic_def, meta_def) = self._get_nic_definition('public', 'eth0')
@@ -280,7 +290,6 @@ class TestNetworkConvert(TestCase):
self.assertEqual(cidr_notated_address, subn_def.get('address'))
self.assertEqual(ipv6_def.get('gateway'), subn_def.get('gateway'))
- self._check_dns_nameservers(subn_def)
def test_public_interface_ipv4(self):
"""test public ipv4 addressing"""
@@ -293,7 +302,6 @@ class TestNetworkConvert(TestCase):
self.assertEqual(ipv4_def.get('netmask'), subn_def.get('netmask'))
self.assertEqual(ipv4_def.get('gateway'), subn_def.get('gateway'))
- self._check_dns_nameservers(subn_def)
def test_public_interface_anchor_ipv4(self):
"""test public ipv4 addressing"""
diff --git a/tests/unittests/test_handler/test_handler_yum_add_repo.py b/tests/unittests/test_handler/test_handler_yum_add_repo.py
index 3feba86c..4815bdb6 100644
--- a/tests/unittests/test_handler/test_handler_yum_add_repo.py
+++ b/tests/unittests/test_handler/test_handler_yum_add_repo.py
@@ -5,10 +5,13 @@ from cloudinit import util
from .. import helpers
-import configobj
+try:
+ from configparser import ConfigParser
+except ImportError:
+ from ConfigParser import ConfigParser
import logging
import shutil
-from six import BytesIO
+from six import StringIO
import tempfile
LOG = logging.getLogger(__name__)
@@ -54,9 +57,9 @@ class TestConfig(helpers.FilesystemMockingTestCase):
}
self.patchUtils(self.tmp)
cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, [])
- contents = util.load_file("/etc/yum.repos.d/epel_testing.repo",
- decode=False)
- contents = configobj.ConfigObj(BytesIO(contents))
+ contents = util.load_file("/etc/yum.repos.d/epel_testing.repo")
+ parser = ConfigParser()
+ parser.readfp(StringIO(contents))
expected = {
'epel_testing': {
'name': 'Extra Packages for Enterprise Linux 5 - Testing',
@@ -67,6 +70,47 @@ class TestConfig(helpers.FilesystemMockingTestCase):
'gpgcheck': '1',
}
}
- self.assertEqual(expected, dict(contents))
+ for section in expected:
+ self.assertTrue(parser.has_section(section),
+ "Contains section {}".format(section))
+ for k, v in expected[section].items():
+ self.assertEqual(parser.get(section, k), v)
+
+ def test_write_config_array(self):
+ cfg = {
+ 'yum_repos': {
+ 'puppetlabs-products': {
+ 'name': 'Puppet Labs Products El 6 - $basearch',
+ 'baseurl':
+ 'http://yum.puppetlabs.com/el/6/products/$basearch',
+ 'gpgkey': [
+ 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-puppetlabs',
+ 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-puppet',
+ ],
+ 'enabled': True,
+ 'gpgcheck': True,
+ }
+ }
+ }
+ self.patchUtils(self.tmp)
+ cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, [])
+ contents = util.load_file("/etc/yum.repos.d/puppetlabs_products.repo")
+ parser = ConfigParser()
+ parser.readfp(StringIO(contents))
+ expected = {
+ 'puppetlabs_products': {
+ 'name': 'Puppet Labs Products El 6 - $basearch',
+ 'baseurl': 'http://yum.puppetlabs.com/el/6/products/$basearch',
+ 'gpgkey': 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-puppetlabs\n'
+ 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-puppet',
+ 'enabled': '1',
+ 'gpgcheck': '1',
+ }
+ }
+ for section in expected:
+ self.assertTrue(parser.has_section(section),
+ "Contains section {}".format(section))
+ for k, v in expected[section].items():
+ self.assertEqual(parser.get(section, k), v)
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py
index 9cc5e4ab..89e75369 100644
--- a/tests/unittests/test_net.py
+++ b/tests/unittests/test_net.py
@@ -1463,13 +1463,16 @@ class TestNetRenderers(CiTestCase):
class TestGetInterfacesByMac(CiTestCase):
_data = {'devices': ['enp0s1', 'enp0s2', 'bond1', 'bridge1',
- 'bridge1-nic', 'tun0'],
+ 'bridge1-nic', 'tun0', 'bond1.101'],
'bonds': ['bond1'],
'bridges': ['bridge1'],
- 'own_macs': ['enp0s1', 'enp0s2', 'bridge1-nic', 'bridge1'],
+ 'vlans': ['bond1.101'],
+ 'own_macs': ['enp0s1', 'enp0s2', 'bridge1-nic', 'bridge1',
+ 'bond1.101'],
'macs': {'enp0s1': 'aa:aa:aa:aa:aa:01',
'enp0s2': 'aa:aa:aa:aa:aa:02',
'bond1': 'aa:aa:aa:aa:aa:01',
+ 'bond1.101': 'aa:aa:aa:aa:aa:01',
'bridge1': 'aa:aa:aa:aa:aa:03',
'bridge1-nic': 'aa:aa:aa:aa:aa:03',
'tun0': None}}
@@ -1484,13 +1487,16 @@ class TestGetInterfacesByMac(CiTestCase):
def _se_is_bridge(self, name):
return name in self.data['bridges']
+ def _se_is_vlan(self, name):
+ return name in self.data['vlans']
+
def _se_interface_has_own_mac(self, name):
return name in self.data['own_macs']
def _mock_setup(self):
self.data = copy.deepcopy(self._data)
mocks = ('get_devicelist', 'get_interface_mac', 'is_bridge',
- 'interface_has_own_mac')
+ 'interface_has_own_mac', 'is_vlan')
self.mocks = {}
for n in mocks:
m = mock.patch('cloudinit.net.' + n,
@@ -1536,6 +1542,24 @@ class TestGetInterfacesByMac(CiTestCase):
mock.call('b1')],
any_order=True)
+ def test_excludes_vlans(self):
+ self._mock_setup()
+ # add a device 'b1', make all return they have their "own mac",
+ # set everything other than 'b1' to be a vlan.
+ # then expect b1 is the only thing left.
+ self.data['macs']['b1'] = 'aa:aa:aa:aa:aa:b1'
+ self.data['devices'].append('b1')
+ self.data['bonds'] = []
+ self.data['bridges'] = []
+ self.data['own_macs'] = self.data['devices']
+ self.data['vlans'] = [f for f in self.data['devices'] if f != "b1"]
+ ret = net.get_interfaces_by_mac()
+ self.assertEqual({'aa:aa:aa:aa:aa:b1': 'b1'}, ret)
+ self.mocks['is_vlan'].assert_has_calls(
+ [mock.call('bridge1'), mock.call('enp0s1'), mock.call('bond1'),
+ mock.call('b1')],
+ any_order=True)
+
def _gzip_data(data):
with io.BytesIO() as iobuf:
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index ab74311e..5d21b4b7 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -103,8 +103,8 @@ class TestWriteFile(helpers.TestCase):
self.assertTrue(os.path.isdir(dirname))
self.assertTrue(os.path.isfile(path))
- def test_custom_mode(self):
- """Verify custom mode works properly."""
+ def test_explicit_mode(self):
+ """Verify explicit file mode works properly."""
path = os.path.join(self.tmp, "NewFile.txt")
contents = "Hey there"
@@ -115,6 +115,35 @@ class TestWriteFile(helpers.TestCase):
file_stat = os.stat(path)
self.assertEqual(0o666, stat.S_IMODE(file_stat.st_mode))
+ def test_copy_mode_no_existing(self):
+ """Verify that file is created with mode 0o644 if copy_mode
+ is true and there is no prior existing file."""
+ path = os.path.join(self.tmp, "NewFile.txt")
+ contents = "Hey there"
+
+ util.write_file(path, contents, copy_mode=True)
+
+ self.assertTrue(os.path.exists(path))
+ self.assertTrue(os.path.isfile(path))
+ file_stat = os.stat(path)
+ self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode))
+
+ def test_copy_mode_with_existing(self):
+ """Verify that file is created using mode of existing file
+ if copy_mode is true."""
+ path = os.path.join(self.tmp, "NewFile.txt")
+ contents = "Hey there"
+
+ open(path, 'w').close()
+ os.chmod(path, 0o666)
+
+ util.write_file(path, contents, copy_mode=True)
+
+ self.assertTrue(os.path.exists(path))
+ self.assertTrue(os.path.isfile(path))
+ file_stat = os.stat(path)
+ self.assertEqual(0o666, stat.S_IMODE(file_stat.st_mode))
+
def test_custom_omode(self):
"""Verify custom omode works properly."""
path = os.path.join(self.tmp, "NewFile.txt")