summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource/test_cloudstack.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource/test_cloudstack.py')
-rw-r--r--tests/unittests/test_datasource/test_cloudstack.py86
1 files changed, 86 insertions, 0 deletions
diff --git a/tests/unittests/test_datasource/test_cloudstack.py b/tests/unittests/test_datasource/test_cloudstack.py
new file mode 100644
index 00000000..656d80d1
--- /dev/null
+++ b/tests/unittests/test_datasource/test_cloudstack.py
@@ -0,0 +1,86 @@
+from cloudinit import helpers
+from cloudinit.sources.DataSourceCloudStack import DataSourceCloudStack
+from ..helpers import TestCase
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
+
+
+class TestCloudStackPasswordFetching(TestCase):
+
+ def setUp(self):
+ super(TestCloudStackPasswordFetching, self).setUp()
+ self.patches = ExitStack()
+ self.addCleanup(self.patches.close)
+ mod_name = 'cloudinit.sources.DataSourceCloudStack'
+ self.patches.enter_context(mock.patch('{0}.ec2'.format(mod_name)))
+ self.patches.enter_context(mock.patch('{0}.uhelp'.format(mod_name)))
+
+ def _set_password_server_response(self, response_string):
+ subp = mock.MagicMock(return_value=(response_string, ''))
+ self.patches.enter_context(
+ mock.patch('cloudinit.sources.DataSourceCloudStack.util.subp',
+ subp))
+ return subp
+
+ def test_empty_password_doesnt_create_config(self):
+ self._set_password_server_response('')
+ ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds.get_data()
+ self.assertEqual({}, ds.get_config_obj())
+
+ def test_saved_password_doesnt_create_config(self):
+ self._set_password_server_response('saved_password')
+ ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds.get_data()
+ self.assertEqual({}, ds.get_config_obj())
+
+ def test_password_sets_password(self):
+ password = 'SekritSquirrel'
+ self._set_password_server_response(password)
+ ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds.get_data()
+ self.assertEqual(password, ds.get_config_obj()['password'])
+
+ def test_bad_request_doesnt_stop_ds_from_working(self):
+ self._set_password_server_response('bad_request')
+ ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ self.assertTrue(ds.get_data())
+
+ def assertRequestTypesSent(self, subp, expected_request_types):
+ request_types = []
+ for call in subp.call_args_list:
+ args = call[0][0]
+ for arg in args:
+ if arg.startswith('DomU_Request'):
+ request_types.append(arg.split()[1])
+ self.assertEqual(expected_request_types, request_types)
+
+ def test_valid_response_means_password_marked_as_saved(self):
+ password = 'SekritSquirrel'
+ subp = self._set_password_server_response(password)
+ ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds.get_data()
+ self.assertRequestTypesSent(subp,
+ ['send_my_password', 'saved_password'])
+
+ def _check_password_not_saved_for(self, response_string):
+ subp = self._set_password_server_response(response_string)
+ ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds.get_data()
+ self.assertRequestTypesSent(subp, ['send_my_password'])
+
+ def test_password_not_saved_if_empty(self):
+ self._check_password_not_saved_for('')
+
+ def test_password_not_saved_if_already_saved(self):
+ self._check_password_not_saved_for('saved_password')
+
+ def test_password_not_saved_if_bad_request(self):
+ self._check_password_not_saved_for('bad_request')