diff options
Diffstat (limited to 'tests/unittests')
-rw-r--r-- | tests/unittests/test_datasource/test_altcloud.py | 123 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_digitalocean.py | 46 | ||||
-rw-r--r-- | tests/unittests/test_handler/test_handler_yum_add_repo.py | 56 | ||||
-rw-r--r-- | tests/unittests/test_net.py | 30 | ||||
-rw-r--r-- | tests/unittests/test_util.py | 33 |
5 files changed, 194 insertions, 94 deletions
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py index b0ad86ab..63a2b04d 100644 --- a/tests/unittests/test_datasource/test_altcloud.py +++ b/tests/unittests/test_datasource/test_altcloud.py @@ -10,6 +10,7 @@ This test file exercises the code in sources DataSourceAltCloud.py ''' +import mock import os import shutil import tempfile @@ -18,10 +19,7 @@ from cloudinit import helpers from cloudinit import util from unittest import TestCase -# Get the cloudinit.sources.DataSourceAltCloud import items needed. -import cloudinit.sources.DataSourceAltCloud -from cloudinit.sources.DataSourceAltCloud import DataSourceAltCloud -from cloudinit.sources.DataSourceAltCloud import read_user_data_callback +import cloudinit.sources.DataSourceAltCloud as dsac OS_UNAME_ORIG = getattr(os, 'uname') @@ -32,17 +30,17 @@ def _write_cloud_info_file(value): with a cloud backend identifier ImageFactory when building an image with ImageFactory. ''' - cifile = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w') + cifile = open(dsac.CLOUD_INFO_FILE, 'w') cifile.write(value) cifile.close() - os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0o664) + os.chmod(dsac.CLOUD_INFO_FILE, 0o664) def _remove_cloud_info_file(): ''' Remove the test CLOUD_INFO_FILE ''' - os.remove(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE) + os.remove(dsac.CLOUD_INFO_FILE) def _write_user_data_files(mount_dir, value): @@ -122,7 +120,7 @@ class TestGetCloudType(TestCase): Forcing read_dmi_data return to match a RHEVm system: RHEV Hypervisor ''' util.read_dmi_data = _dmi_data('RHEV') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual('RHEV', dsrc.get_cloud_type()) def test_vsphere(self): @@ -131,7 +129,7 @@ class TestGetCloudType(TestCase): Forcing read_dmi_data return to match a vSphere system: RHEV Hypervisor ''' util.read_dmi_data = _dmi_data('VMware Virtual Platform') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual('VSPHERE', dsrc.get_cloud_type()) def test_unknown(self): @@ -140,7 +138,7 @@ class TestGetCloudType(TestCase): Forcing read_dmi_data return to match an unrecognized return. ''' util.read_dmi_data = _dmi_data('Unrecognized Platform') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual('UNKNOWN', dsrc.get_cloud_type()) @@ -154,8 +152,7 @@ class TestGetDataCloudInfoFile(TestCase): self.paths = helpers.Paths({'cloud_dir': '/tmp'}) self.cloud_info_file = tempfile.mkstemp()[1] self.dmi_data = util.read_dmi_data - cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ - self.cloud_info_file + dsac.CLOUD_INFO_FILE = self.cloud_info_file def tearDown(self): # Reset @@ -167,14 +164,13 @@ class TestGetDataCloudInfoFile(TestCase): pass util.read_dmi_data = self.dmi_data - cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ - '/etc/sysconfig/cloud-info' + dsac.CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info' def test_rhev(self): '''Success Test module get_data() forcing RHEV.''' _write_cloud_info_file('RHEV') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) dsrc.user_data_rhevm = lambda: True self.assertEqual(True, dsrc.get_data()) @@ -182,7 +178,7 @@ class TestGetDataCloudInfoFile(TestCase): '''Success Test module get_data() forcing VSPHERE.''' _write_cloud_info_file('VSPHERE') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) dsrc.user_data_vsphere = lambda: True self.assertEqual(True, dsrc.get_data()) @@ -190,7 +186,7 @@ class TestGetDataCloudInfoFile(TestCase): '''Failure Test module get_data() forcing RHEV.''' _write_cloud_info_file('RHEV') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) dsrc.user_data_rhevm = lambda: False self.assertEqual(False, dsrc.get_data()) @@ -198,7 +194,7 @@ class TestGetDataCloudInfoFile(TestCase): '''Failure Test module get_data() forcing VSPHERE.''' _write_cloud_info_file('VSPHERE') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) dsrc.user_data_vsphere = lambda: False self.assertEqual(False, dsrc.get_data()) @@ -206,7 +202,7 @@ class TestGetDataCloudInfoFile(TestCase): '''Failure Test module get_data() forcing unrecognized.''' _write_cloud_info_file('unrecognized') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.get_data()) @@ -219,7 +215,7 @@ class TestGetDataNoCloudInfoFile(TestCase): '''Set up.''' self.paths = helpers.Paths({'cloud_dir': '/tmp'}) self.dmi_data = util.read_dmi_data - cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + dsac.CLOUD_INFO_FILE = \ 'no such file' # We have a different code path for arm to deal with LP1243287 # We have to switch arch to x86_64 to avoid test failure @@ -227,7 +223,7 @@ class TestGetDataNoCloudInfoFile(TestCase): def tearDown(self): # Reset - cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + dsac.CLOUD_INFO_FILE = \ '/etc/sysconfig/cloud-info' util.read_dmi_data = self.dmi_data # Return back to original arch @@ -237,7 +233,7 @@ class TestGetDataNoCloudInfoFile(TestCase): '''Test No cloud info file module get_data() forcing RHEV.''' util.read_dmi_data = _dmi_data('RHEV Hypervisor') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) dsrc.user_data_rhevm = lambda: True self.assertEqual(True, dsrc.get_data()) @@ -245,7 +241,7 @@ class TestGetDataNoCloudInfoFile(TestCase): '''Test No cloud info file module get_data() forcing VSPHERE.''' util.read_dmi_data = _dmi_data('VMware Virtual Platform') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) dsrc.user_data_vsphere = lambda: True self.assertEqual(True, dsrc.get_data()) @@ -253,7 +249,7 @@ class TestGetDataNoCloudInfoFile(TestCase): '''Test No cloud info file module get_data() forcing unrecognized.''' util.read_dmi_data = _dmi_data('Unrecognized Platform') - dsrc = DataSourceAltCloud({}, None, self.paths) + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.get_data()) @@ -261,11 +257,14 @@ class TestUserDataRhevm(TestCase): ''' Test to exercise method: DataSourceAltCloud.user_data_rhevm() ''' + cmd_pass = ['true'] + cmd_fail = ['false'] + cmd_not_found = ['bogus bad command'] + def setUp(self): '''Set up.''' self.paths = helpers.Paths({'cloud_dir': '/tmp'}) self.mount_dir = tempfile.mkdtemp() - _write_user_data_files(self.mount_dir, 'test user data') def tearDown(self): @@ -279,61 +278,44 @@ class TestUserDataRhevm(TestCase): except OSError: pass - cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ - '/etc/sysconfig/cloud-info' - cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ - ['/sbin/modprobe', 'floppy'] - cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \ - ['/sbin/udevadm', 'settle', '--quiet', '--timeout=5'] + dsac.CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info' + dsac.CMD_PROBE_FLOPPY = ['/sbin/modprobe', 'floppy'] + dsac.CMD_UDEVADM_SETTLE = ['/sbin/udevadm', 'settle', + '--quiet', '--timeout=5'] def test_mount_cb_fails(self): '''Test user_data_rhevm() where mount_cb fails.''' - cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ - ['echo', 'modprobe floppy'] - - dsrc = DataSourceAltCloud({}, None, self.paths) - + dsac.CMD_PROBE_FLOPPY = self.cmd_pass + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.user_data_rhevm()) def test_modprobe_fails(self): '''Test user_data_rhevm() where modprobe fails.''' - cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ - ['ls', 'modprobe floppy'] - - dsrc = DataSourceAltCloud({}, None, self.paths) - + dsac.CMD_PROBE_FLOPPY = self.cmd_fail + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.user_data_rhevm()) def test_no_modprobe_cmd(self): '''Test user_data_rhevm() with no modprobe command.''' - cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \ - ['bad command', 'modprobe floppy'] - - dsrc = DataSourceAltCloud({}, None, self.paths) - + dsac.CMD_PROBE_FLOPPY = self.cmd_not_found + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.user_data_rhevm()) def test_udevadm_fails(self): '''Test user_data_rhevm() where udevadm fails.''' - cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \ - ['ls', 'udevadm floppy'] - - dsrc = DataSourceAltCloud({}, None, self.paths) - + dsac.CMD_UDEVADM_SETTLE = self.cmd_fail + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.user_data_rhevm()) def test_no_udevadm_cmd(self): '''Test user_data_rhevm() with no udevadm command.''' - cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \ - ['bad command', 'udevadm floppy'] - - dsrc = DataSourceAltCloud({}, None, self.paths) - + dsac.CMD_UDEVADM_SETTLE = self.cmd_not_found + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.user_data_rhevm()) @@ -359,17 +341,30 @@ class TestUserDataVsphere(TestCase): except OSError: pass - cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \ + dsac.CLOUD_INFO_FILE = \ '/etc/sysconfig/cloud-info' - def test_user_data_vsphere(self): + @mock.patch("cloudinit.sources.DataSourceAltCloud.util.find_devs_with") + @mock.patch("cloudinit.sources.DataSourceAltCloud.util.mount_cb") + def test_user_data_vsphere_no_cdrom(self, m_mount_cb, m_find_devs_with): '''Test user_data_vsphere() where mount_cb fails.''' - cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = self.mount_dir + m_mount_cb.return_value = [] + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) + self.assertEqual(False, dsrc.user_data_vsphere()) + self.assertEqual(0, m_mount_cb.call_count) - dsrc = DataSourceAltCloud({}, None, self.paths) + @mock.patch("cloudinit.sources.DataSourceAltCloud.util.find_devs_with") + @mock.patch("cloudinit.sources.DataSourceAltCloud.util.mount_cb") + def test_user_data_vsphere_mcb_fail(self, m_mount_cb, m_find_devs_with): + '''Test user_data_vsphere() where mount_cb fails.''' + m_find_devs_with.return_value = ["/dev/mock/cdrom"] + m_mount_cb.side_effect = util.MountFailedError("Unable To mount") + dsrc = dsac.DataSourceAltCloud({}, None, self.paths) self.assertEqual(False, dsrc.user_data_vsphere()) + self.assertEqual(1, m_find_devs_with.call_count) + self.assertEqual(1, m_mount_cb.call_count) class TestReadUserDataCallback(TestCase): @@ -398,7 +393,7 @@ class TestReadUserDataCallback(TestCase): '''Test read_user_data_callback() with both files.''' self.assertEqual('test user data', - read_user_data_callback(self.mount_dir)) + dsac.read_user_data_callback(self.mount_dir)) def test_callback_dc(self): '''Test read_user_data_callback() with only DC file.''' @@ -408,7 +403,7 @@ class TestReadUserDataCallback(TestCase): non_dc_file=True) self.assertEqual('test user data', - read_user_data_callback(self.mount_dir)) + dsac.read_user_data_callback(self.mount_dir)) def test_callback_non_dc(self): '''Test read_user_data_callback() with only non-DC file.''' @@ -418,13 +413,13 @@ class TestReadUserDataCallback(TestCase): non_dc_file=False) self.assertEqual('test user data', - read_user_data_callback(self.mount_dir)) + dsac.read_user_data_callback(self.mount_dir)) def test_callback_none(self): '''Test read_user_data_callback() no files are found.''' _remove_user_data_files(self.mount_dir) - self.assertEqual(None, read_user_data_callback(self.mount_dir)) + self.assertEqual(None, dsac.read_user_data_callback(self.mount_dir)) def force_arch(arch=None): diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py index 61d6e001..a11166a9 100644 --- a/tests/unittests/test_datasource/test_digitalocean.py +++ b/tests/unittests/test_datasource/test_digitalocean.py @@ -197,7 +197,8 @@ class TestNetworkConvert(TestCase): @mock.patch('cloudinit.net.get_interfaces_by_mac') def _get_networking(self, m_get_by_mac): m_get_by_mac.return_value = { - '04:01:57:d1:9e:01': 'ens1', '04:01:57:d1:9e:02': 'ens2', + '04:01:57:d1:9e:01': 'ens1', + '04:01:57:d1:9e:02': 'ens2', 'b8:ae:ed:75:5f:9a': 'enp0s25', 'ae:cc:08:7c:88:00': 'meta2p1'} netcfg = digitalocean.convert_network_configuration( @@ -208,18 +209,33 @@ class TestNetworkConvert(TestCase): def test_networking_defined(self): netcfg = self._get_networking() self.assertIsNotNone(netcfg) + dns_defined = False - for nic_def in netcfg.get('config'): - print(json.dumps(nic_def, indent=3)) - n_type = nic_def.get('type') - n_subnets = nic_def.get('type') - n_name = nic_def.get('name') - n_mac = nic_def.get('mac_address') + for part in netcfg.get('config'): + n_type = part.get('type') + print("testing part ", n_type, "\n", json.dumps(part, indent=3)) + + if n_type == 'nameserver': + n_address = part.get('address') + self.assertIsNotNone(n_address) + self.assertEqual(len(n_address), 3) + + dns_resolvers = DO_META["dns"]["nameservers"] + for x in n_address: + self.assertIn(x, dns_resolvers) + dns_defined = True - self.assertIsNotNone(n_type) - self.assertIsNotNone(n_subnets) - self.assertIsNotNone(n_name) - self.assertIsNotNone(n_mac) + else: + n_subnets = part.get('type') + n_name = part.get('name') + n_mac = part.get('mac_address') + + self.assertIsNotNone(n_type) + self.assertIsNotNone(n_subnets) + self.assertIsNotNone(n_name) + self.assertIsNotNone(n_mac) + + self.assertTrue(dns_defined) def _get_nic_definition(self, int_type, expected_name): """helper function to return if_type (i.e. public) and the expected @@ -260,12 +276,6 @@ class TestNetworkConvert(TestCase): self.assertEqual(meta_def.get('mac'), nic_def.get('mac_address')) self.assertEqual('physical', nic_def.get('type')) - def _check_dns_nameservers(self, subn_def): - self.assertIn('dns_nameservers', subn_def) - expected_nameservers = DO_META['dns']['nameservers'] - nic_nameservers = subn_def.get('dns_nameservers') - self.assertEqual(expected_nameservers, nic_nameservers) - def test_public_interface_ipv6(self): """test public ipv6 addressing""" (nic_def, meta_def) = self._get_nic_definition('public', 'eth0') @@ -280,7 +290,6 @@ class TestNetworkConvert(TestCase): self.assertEqual(cidr_notated_address, subn_def.get('address')) self.assertEqual(ipv6_def.get('gateway'), subn_def.get('gateway')) - self._check_dns_nameservers(subn_def) def test_public_interface_ipv4(self): """test public ipv4 addressing""" @@ -293,7 +302,6 @@ class TestNetworkConvert(TestCase): self.assertEqual(ipv4_def.get('netmask'), subn_def.get('netmask')) self.assertEqual(ipv4_def.get('gateway'), subn_def.get('gateway')) - self._check_dns_nameservers(subn_def) def test_public_interface_anchor_ipv4(self): """test public ipv4 addressing""" diff --git a/tests/unittests/test_handler/test_handler_yum_add_repo.py b/tests/unittests/test_handler/test_handler_yum_add_repo.py index 3feba86c..4815bdb6 100644 --- a/tests/unittests/test_handler/test_handler_yum_add_repo.py +++ b/tests/unittests/test_handler/test_handler_yum_add_repo.py @@ -5,10 +5,13 @@ from cloudinit import util from .. import helpers -import configobj +try: + from configparser import ConfigParser +except ImportError: + from ConfigParser import ConfigParser import logging import shutil -from six import BytesIO +from six import StringIO import tempfile LOG = logging.getLogger(__name__) @@ -54,9 +57,9 @@ class TestConfig(helpers.FilesystemMockingTestCase): } self.patchUtils(self.tmp) cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, []) - contents = util.load_file("/etc/yum.repos.d/epel_testing.repo", - decode=False) - contents = configobj.ConfigObj(BytesIO(contents)) + contents = util.load_file("/etc/yum.repos.d/epel_testing.repo") + parser = ConfigParser() + parser.readfp(StringIO(contents)) expected = { 'epel_testing': { 'name': 'Extra Packages for Enterprise Linux 5 - Testing', @@ -67,6 +70,47 @@ class TestConfig(helpers.FilesystemMockingTestCase): 'gpgcheck': '1', } } - self.assertEqual(expected, dict(contents)) + for section in expected: + self.assertTrue(parser.has_section(section), + "Contains section {}".format(section)) + for k, v in expected[section].items(): + self.assertEqual(parser.get(section, k), v) + + def test_write_config_array(self): + cfg = { + 'yum_repos': { + 'puppetlabs-products': { + 'name': 'Puppet Labs Products El 6 - $basearch', + 'baseurl': + 'http://yum.puppetlabs.com/el/6/products/$basearch', + 'gpgkey': [ + 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-puppetlabs', + 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-puppet', + ], + 'enabled': True, + 'gpgcheck': True, + } + } + } + self.patchUtils(self.tmp) + cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, []) + contents = util.load_file("/etc/yum.repos.d/puppetlabs_products.repo") + parser = ConfigParser() + parser.readfp(StringIO(contents)) + expected = { + 'puppetlabs_products': { + 'name': 'Puppet Labs Products El 6 - $basearch', + 'baseurl': 'http://yum.puppetlabs.com/el/6/products/$basearch', + 'gpgkey': 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-puppetlabs\n' + 'file:///etc/pki/rpm-gpg/RPM-GPG-KEY-puppet', + 'enabled': '1', + 'gpgcheck': '1', + } + } + for section in expected: + self.assertTrue(parser.has_section(section), + "Contains section {}".format(section)) + for k, v in expected[section].items(): + self.assertEqual(parser.get(section, k), v) # vi: ts=4 expandtab diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py index 9cc5e4ab..89e75369 100644 --- a/tests/unittests/test_net.py +++ b/tests/unittests/test_net.py @@ -1463,13 +1463,16 @@ class TestNetRenderers(CiTestCase): class TestGetInterfacesByMac(CiTestCase): _data = {'devices': ['enp0s1', 'enp0s2', 'bond1', 'bridge1', - 'bridge1-nic', 'tun0'], + 'bridge1-nic', 'tun0', 'bond1.101'], 'bonds': ['bond1'], 'bridges': ['bridge1'], - 'own_macs': ['enp0s1', 'enp0s2', 'bridge1-nic', 'bridge1'], + 'vlans': ['bond1.101'], + 'own_macs': ['enp0s1', 'enp0s2', 'bridge1-nic', 'bridge1', + 'bond1.101'], 'macs': {'enp0s1': 'aa:aa:aa:aa:aa:01', 'enp0s2': 'aa:aa:aa:aa:aa:02', 'bond1': 'aa:aa:aa:aa:aa:01', + 'bond1.101': 'aa:aa:aa:aa:aa:01', 'bridge1': 'aa:aa:aa:aa:aa:03', 'bridge1-nic': 'aa:aa:aa:aa:aa:03', 'tun0': None}} @@ -1484,13 +1487,16 @@ class TestGetInterfacesByMac(CiTestCase): def _se_is_bridge(self, name): return name in self.data['bridges'] + def _se_is_vlan(self, name): + return name in self.data['vlans'] + def _se_interface_has_own_mac(self, name): return name in self.data['own_macs'] def _mock_setup(self): self.data = copy.deepcopy(self._data) mocks = ('get_devicelist', 'get_interface_mac', 'is_bridge', - 'interface_has_own_mac') + 'interface_has_own_mac', 'is_vlan') self.mocks = {} for n in mocks: m = mock.patch('cloudinit.net.' + n, @@ -1536,6 +1542,24 @@ class TestGetInterfacesByMac(CiTestCase): mock.call('b1')], any_order=True) + def test_excludes_vlans(self): + self._mock_setup() + # add a device 'b1', make all return they have their "own mac", + # set everything other than 'b1' to be a vlan. + # then expect b1 is the only thing left. + self.data['macs']['b1'] = 'aa:aa:aa:aa:aa:b1' + self.data['devices'].append('b1') + self.data['bonds'] = [] + self.data['bridges'] = [] + self.data['own_macs'] = self.data['devices'] + self.data['vlans'] = [f for f in self.data['devices'] if f != "b1"] + ret = net.get_interfaces_by_mac() + self.assertEqual({'aa:aa:aa:aa:aa:b1': 'b1'}, ret) + self.mocks['is_vlan'].assert_has_calls( + [mock.call('bridge1'), mock.call('enp0s1'), mock.call('bond1'), + mock.call('b1')], + any_order=True) + def _gzip_data(data): with io.BytesIO() as iobuf: diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index ab74311e..5d21b4b7 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -103,8 +103,8 @@ class TestWriteFile(helpers.TestCase): self.assertTrue(os.path.isdir(dirname)) self.assertTrue(os.path.isfile(path)) - def test_custom_mode(self): - """Verify custom mode works properly.""" + def test_explicit_mode(self): + """Verify explicit file mode works properly.""" path = os.path.join(self.tmp, "NewFile.txt") contents = "Hey there" @@ -115,6 +115,35 @@ class TestWriteFile(helpers.TestCase): file_stat = os.stat(path) self.assertEqual(0o666, stat.S_IMODE(file_stat.st_mode)) + def test_copy_mode_no_existing(self): + """Verify that file is created with mode 0o644 if copy_mode + is true and there is no prior existing file.""" + path = os.path.join(self.tmp, "NewFile.txt") + contents = "Hey there" + + util.write_file(path, contents, copy_mode=True) + + self.assertTrue(os.path.exists(path)) + self.assertTrue(os.path.isfile(path)) + file_stat = os.stat(path) + self.assertEqual(0o644, stat.S_IMODE(file_stat.st_mode)) + + def test_copy_mode_with_existing(self): + """Verify that file is created using mode of existing file + if copy_mode is true.""" + path = os.path.join(self.tmp, "NewFile.txt") + contents = "Hey there" + + open(path, 'w').close() + os.chmod(path, 0o666) + + util.write_file(path, contents, copy_mode=True) + + self.assertTrue(os.path.exists(path)) + self.assertTrue(os.path.isfile(path)) + file_stat = os.stat(path) + self.assertEqual(0o666, stat.S_IMODE(file_stat.st_mode)) + def test_custom_omode(self): """Verify custom omode works properly.""" path = os.path.join(self.tmp, "NewFile.txt") |