# This file is part of cloud-init. See LICENSE file for license information. from cloudinit import helpers from cloudinit import util from cloudinit.sources.DataSourceCloudStack import ( DataSourceCloudStack, get_latest_lease) from cloudinit.tests.helpers import CiTestCase, ExitStack, mock import os import time MOD_PATH = 'cloudinit.sources.DataSourceCloudStack' DS_PATH = MOD_PATH + '.DataSourceCloudStack' class TestCloudStackPasswordFetching(CiTestCase): def setUp(self): super(TestCloudStackPasswordFetching, self).setUp() self.patches = ExitStack() self.addCleanup(self.patches.close) mod_name = MOD_PATH self.patches.enter_context(mock.patch('{0}.ec2'.format(mod_name))) self.patches.enter_context(mock.patch('{0}.uhelp'.format(mod_name))) default_gw = "192.201.20.0" get_latest_lease = mock.MagicMock(return_value=None) self.patches.enter_context(mock.patch( mod_name + '.get_latest_lease', get_latest_lease)) get_default_gw = mock.MagicMock(return_value=default_gw) self.patches.enter_context(mock.patch( mod_name + '.get_default_gateway', get_default_gw)) get_networkd_server_address = mock.MagicMock(return_value=None) self.patches.enter_context(mock.patch( mod_name + '.dhcp.networkd_get_option_from_leases', get_networkd_server_address)) self.tmp = self.tmp_dir() def _set_password_server_response(self, response_string): subp = mock.MagicMock(return_value=(response_string, '')) self.patches.enter_context( mock.patch('cloudinit.sources.DataSourceCloudStack.util.subp', subp)) return subp def test_empty_password_doesnt_create_config(self): self._set_password_server_response('') ds = DataSourceCloudStack( {}, None, helpers.Paths({'run_dir': self.tmp})) ds.get_data() self.assertEqual({}, ds.get_config_obj()) def test_saved_password_doesnt_create_config(self): self._set_password_server_response('saved_password') ds = DataSourceCloudStack( {}, None, helpers.Paths({'run_dir': self.tmp})) ds.get_data() self.assertEqual({}, ds.get_config_obj()) @mock.patch(DS_PATH + '.wait_for_metadata_service') def test_password_sets_password(self, m_wait): m_wait.return_value = True password = 'SekritSquirrel' self._set_password_server_response(password) ds = DataSourceCloudStack( {}, None, helpers.Paths({'run_dir': self.tmp})) ds.get_data() self.assertEqual(password, ds.get_config_obj()['password']) @mock.patch(DS_PATH + '.wait_for_metadata_service') def test_bad_request_doesnt_stop_ds_from_working(self, m_wait): m_wait.return_value = True self._set_password_server_response('bad_request') ds = DataSourceCloudStack( {}, None, helpers.Paths({'run_dir': self.tmp})) self.assertTrue(ds.get_data()) def assertRequestTypesSent(self, subp, expected_request_types): request_types = [] for call in subp.call_args_list: args = call[0][0] for arg in args: if arg.startswith('DomU_Request'): request_types.append(arg.split()[1]) self.assertEqual(expected_request_types, request_types) @mock.patch(DS_PATH + '.wait_for_metadata_service') def test_valid_response_means_password_marked_as_saved(self, m_wait): m_wait.return_value = True password = 'SekritSquirrel' subp = self._set_password_server_response(password) ds = DataSourceCloudStack( {}, None, helpers.Paths({'run_dir': self.tmp})) ds.get_data() self.assertRequestTypesSent(subp, ['send_my_password', 'saved_password']) def _check_password_not_saved_for(self, response_string): subp = self._set_password_server_response(response_string) ds = DataSourceCloudStack( {}, None, helpers.Paths({'run_dir': self.tmp})) with mock.patch(DS_PATH + '.wait_for_metadata_service') as m_wait: m_wait.return_value = True ds.get_data() self.assertRequestTypesSent(subp, ['send_my_password']) def test_password_not_saved_if_empty(self): self._check_password_not_saved_for('') def test_password_not_saved_if_already_saved(self): self._check_password_not_saved_for('saved_password') def test_password_not_saved_if_bad_request(self): self._check_password_not_saved_for('bad_request') class TestGetLatestLease(CiTestCase): def _populate_dir_list(self, bdir, files): """populate_dir_list([(name, data), (name, data)]) writes files to bdir, and updates timestamps to ensure that their mtime increases with each file.""" start = int(time.time()) for num, fname in enumerate(reversed(files)): fpath = os.path.sep.join((bdir, fname)) util.write_file(fpath, fname.encode()) os.utime(fpath, (start - num, start - num)) def _pop_and_test(self, files, expected): lease_d = self.tmp_dir() self._populate_dir_list(lease_d, files) self.assertEqual(self.tmp_path(expected, lease_d), get_latest_lease(lease_d)) def test_skips_dhcpv6_files(self): """files started with dhclient6 should be skipped.""" expected = "dhclient.lease" self._pop_and_test([expected, "dhclient6.lease"], expected) def test_selects_dhclient_dot_files(self): """files named dhclient.lease or dhclient.leases should be used. Ubuntu names files dhclient.eth0.leases dhclient6.leases and sometimes dhclient.leases.""" self._pop_and_test(["dhclient.lease"], "dhclient.lease") self._pop_and_test(["dhclient.leases"], "dhclient.leases") def test_selects_dhclient_dash_files(self): """files named dhclient-lease or dhclient-leases should be used. Redhat/Centos names files with dhclient--eth0.lease (centos 7) or dhclient-eth0.leases (centos 6). """ self._pop_and_test(["dhclient-eth0.lease"], "dhclient-eth0.lease") self._pop_and_test(["dhclient--eth0.lease"], "dhclient--eth0.lease") def test_ignores_by_extension(self): """only .lease or .leases file should be considered.""" self._pop_and_test(["dhclient.lease", "dhclient.lease.bk", "dhclient.lease-old", "dhclient.leaselease"], "dhclient.lease") def test_selects_newest_matching(self): """If multiple files match, the newest written should be used.""" lease_d = self.tmp_dir() valid_1 = "dhclient.leases" valid_2 = "dhclient.lease" valid_1_path = self.tmp_path(valid_1, lease_d) valid_2_path = self.tmp_path(valid_2, lease_d) self._populate_dir_list(lease_d, [valid_1, valid_2]) self.assertEqual(valid_2_path, get_latest_lease(lease_d)) # now update mtime on valid_2 to be older than valid_1 and re-check. mtime = int(os.path.getmtime(valid_1_path)) - 1 os.utime(valid_2_path, (mtime, mtime)) self.assertEqual(valid_1_path, get_latest_lease(lease_d)) # vi: ts=4 expandtab