summaryrefslogtreecommitdiff
path: root/tests/protocol/test_wire.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/protocol/test_wire.py')
-rw-r--r--tests/protocol/test_wire.py237
1 files changed, 219 insertions, 18 deletions
diff --git a/tests/protocol/test_wire.py b/tests/protocol/test_wire.py
index bd3acaf..8c9cc02 100644
--- a/tests/protocol/test_wire.py
+++ b/tests/protocol/test_wire.py
@@ -15,31 +15,23 @@
# Requires Python 2.4+ and Openssl 1.0+
#
-from tests.tools import *
+from azurelinuxagent.common.protocol.wire import *
from tests.protocol.mockwiredata import *
-import uuid
-import unittest
-import os
-import time
-from azurelinuxagent.common.utils.restutil import httpclient
-from azurelinuxagent.common.utils.cryptutil import CryptUtil
-from azurelinuxagent.common.protocol.restapi import *
-from azurelinuxagent.common.protocol.wire import WireClient, WireProtocol, \
- TRANSPORT_PRV_FILE_NAME, \
- TRANSPORT_CERT_FILE_NAME
data_with_bom = b'\xef\xbb\xbfhehe'
+testurl = 'http://foo'
+testtype = 'BlockBlob'
+wireserver_url = '168.63.129.16'
@patch("time.sleep")
@patch("azurelinuxagent.common.protocol.wire.CryptUtil")
@patch("azurelinuxagent.common.protocol.wire.restutil")
class TestWireProtocolGetters(AgentTestCase):
-
def _test_getters(self, test_data, mock_restutil, MockCryptUtil, _):
mock_restutil.http_get.side_effect = test_data.mock_http_get
MockCryptUtil.side_effect = test_data.mock_crypt_util
- protocol = WireProtocol("foo.bar")
+ protocol = WireProtocol(wireserver_url)
protocol.detect()
protocol.get_vminfo()
protocol.get_certs()
@@ -47,9 +39,9 @@ class TestWireProtocolGetters(AgentTestCase):
for ext_handler in ext_handlers.extHandlers:
protocol.get_ext_handler_pkgs(ext_handler)
- crt1 = os.path.join(self.tmp_dir,
- '33B0ABCE4673538650971C10F7D7397E71561F35.crt')
- crt2 = os.path.join(self.tmp_dir,
+ crt1 = os.path.join(self.tmp_dir,
+ '33B0ABCE4673538650971C10F7D7397E71561F35.crt')
+ crt2 = os.path.join(self.tmp_dir,
'4037FBF5F1F3014F99B5D6C7799E9B20E6871CB3.crt')
prv2 = os.path.join(self.tmp_dir,
'4037FBF5F1F3014F99B5D6C7799E9B20E6871CB3.prv')
@@ -57,7 +49,7 @@ class TestWireProtocolGetters(AgentTestCase):
self.assertTrue(os.path.isfile(crt1))
self.assertTrue(os.path.isfile(crt2))
self.assertTrue(os.path.isfile(prv2))
-
+
def test_getters(self, *args):
"""Normal case"""
test_data = WireProtocolData(DATA_FILE)
@@ -72,11 +64,220 @@ class TestWireProtocolGetters(AgentTestCase):
"""Extensions without any settings"""
test_data = WireProtocolData(DATA_FILE_EXT_NO_SETTINGS)
self._test_getters(test_data, *args)
-
+
def test_getters_ext_no_public(self, *args):
"""Extensions without any public settings"""
test_data = WireProtocolData(DATA_FILE_EXT_NO_PUBLIC)
self._test_getters(test_data, *args)
+ def test_call_storage_kwargs(self,
+ mock_restutil,
+ mock_cryptutil,
+ mock_sleep):
+ from azurelinuxagent.common.utils import restutil
+ with patch.object(restutil, 'http_get') as http_patch:
+ http_req = restutil.http_get
+ url = testurl
+ headers = {}
+
+ # no kwargs
+ WireClient.call_storage_service(http_req)
+ # kwargs, no chk_proxy
+ WireClient.call_storage_service(http_req,
+ url,
+ headers)
+ # kwargs, chk_proxy False
+ WireClient.call_storage_service(http_req,
+ url,
+ headers,
+ chk_proxy=False)
+ # kwargs, chk_proxy True
+ WireClient.call_storage_service(http_req,
+ url,
+ headers,
+ chk_proxy=True)
+ # assert
+ self.assertTrue(http_patch.call_count == 4)
+ for c in http_patch.call_args_list:
+ self.assertTrue(c[-1]['chk_proxy'] == True)
+
+ def test_status_blob_parsing(self, *args):
+ wire_protocol_client = WireProtocol(wireserver_url).client
+ wire_protocol_client.ext_conf = ExtensionsConfig(WireProtocolData(DATA_FILE).ext_conf)
+ self.assertEqual(wire_protocol_client.ext_conf.status_upload_blob,
+ u'https://yuezhatest.blob.core.windows.net/vhds/test'
+ u'-cs12.test-cs12.test-cs12.status?sr=b&sp=rw&se'
+ u'=9999-01-01&sk=key1&sv=2014-02-14&sig'
+ u'=hfRh7gzUE7sUtYwke78IOlZOrTRCYvkec4hGZ9zZzXo%3D')
+ self.assertEqual(wire_protocol_client.ext_conf.status_upload_blob_type,
+ u'BlockBlob')
+ pass
+
+ def test_get_host_ga_plugin(self, *args):
+ wire_protocol_client = WireProtocol(wireserver_url).client
+ goal_state = GoalState(WireProtocolData(DATA_FILE).goal_state)
+
+ with patch.object(WireClient, "get_goal_state", return_value = goal_state) as patch_get_goal_state:
+ host_plugin = wire_protocol_client.get_host_plugin()
+ self.assertEqual(goal_state.container_id, host_plugin.container_id)
+ self.assertEqual(goal_state.role_config_name, host_plugin.role_config_name)
+ patch_get_goal_state.assert_called_once()
+
+ def test_download_ext_handler_pkg_fallback(self, *args):
+ ext_uri = 'extension_uri'
+ host_uri = 'host_uri'
+ mock_host = HostPluginProtocol(host_uri, 'container_id', 'role_config')
+ with patch.object(restutil,
+ "http_request",
+ side_effect=IOError) as patch_http:
+ with patch.object(WireClient,
+ "get_host_plugin",
+ return_value=mock_host):
+ with patch.object(HostPluginProtocol,
+ "get_artifact_request",
+ return_value=[host_uri, {}]) as patch_request:
+
+ WireProtocol(wireserver_url).download_ext_handler_pkg(ext_uri)
+
+ self.assertEqual(patch_http.call_count, 2)
+ self.assertEqual(patch_request.call_count, 1)
+
+ self.assertEqual(patch_http.call_args_list[0][0][1],
+ ext_uri)
+ self.assertEqual(patch_http.call_args_list[1][0][1],
+ host_uri)
+
+ def test_upload_status_blob_default(self, *args):
+ wire_protocol_client = WireProtocol(wireserver_url).client
+ wire_protocol_client.ext_conf = ExtensionsConfig(None)
+ wire_protocol_client.ext_conf.status_upload_blob = testurl
+
+ with patch.object(WireClient, "get_goal_state") as patch_get_goal_state:
+ with patch.object(HostPluginProtocol, "put_vm_status") as patch_host_ga_plugin_upload:
+ with patch.object(StatusBlob, "upload", return_value = True) as patch_default_upload:
+ wire_protocol_client.upload_status_blob()
+
+ patch_default_upload.assert_called_once_with(testurl)
+ patch_get_goal_state.assert_not_called()
+ patch_host_ga_plugin_upload.assert_not_called()
+
+ def test_upload_status_blob_host_ga_plugin(self, *args):
+ wire_protocol_client = WireProtocol(wireserver_url).client
+ wire_protocol_client.ext_conf = ExtensionsConfig(None)
+ wire_protocol_client.ext_conf.status_upload_blob = testurl
+ wire_protocol_client.ext_conf.status_upload_blob_type = testtype
+ goal_state = GoalState(WireProtocolData(DATA_FILE).goal_state)
+
+ with patch.object(HostPluginProtocol, "put_vm_status") as patch_host_ga_plugin_upload:
+ with patch.object(StatusBlob, "upload", return_value=False) as patch_default_upload:
+ wire_protocol_client.get_goal_state = Mock(return_value = goal_state)
+ wire_protocol_client.upload_status_blob()
+
+ patch_default_upload.assert_called_once_with(testurl)
+ wire_protocol_client.get_goal_state.assert_called_once()
+ patch_host_ga_plugin_upload.assert_called_once_with(wire_protocol_client.status_blob, testurl, testtype)
+
+ def test_get_in_vm_artifacts_profile_blob_not_available(self, *args):
+ wire_protocol_client = WireProtocol(wireserver_url).client
+ wire_protocol_client.ext_conf = ExtensionsConfig(None)
+
+ # Test when artifacts_profile_blob is null/None
+ self.assertEqual(None, wire_protocol_client.get_artifacts_profile())
+
+ #Test when artifacts_profile_blob is whitespace
+ wire_protocol_client.ext_conf.artifacts_profile_blob = " "
+ self.assertEqual(None, wire_protocol_client.get_artifacts_profile())
+
+ def test_get_in_vm_artifacts_profile_response_body_not_valid(self, *args):
+ wire_protocol_client = WireProtocol(wireserver_url).client
+ wire_protocol_client.ext_conf = ExtensionsConfig(None)
+ wire_protocol_client.ext_conf.artifacts_profile_blob = testurl
+ goal_state = GoalState(WireProtocolData(DATA_FILE).goal_state)
+ wire_protocol_client.get_goal_state = Mock(return_value=goal_state)
+
+ with patch.object(HostPluginProtocol, "get_artifact_request",
+ return_value = ['dummy_url', {}]) as host_plugin_get_artifact_url_and_headers:
+ #Test when response body is None
+ wire_protocol_client.call_storage_service = Mock(return_value=MockResponse(None, 200))
+ in_vm_artifacts_profile = wire_protocol_client.get_artifacts_profile()
+ self.assertTrue(in_vm_artifacts_profile is None)
+
+ #Test when response body is None
+ wire_protocol_client.call_storage_service = Mock(return_value=MockResponse(' '.encode('utf-8'), 200))
+ in_vm_artifacts_profile = wire_protocol_client.get_artifacts_profile()
+ self.assertTrue(in_vm_artifacts_profile is None)
+
+ #Test when response body is None
+ wire_protocol_client.call_storage_service = Mock(return_value=MockResponse('{ }'.encode('utf-8'), 200))
+ in_vm_artifacts_profile = wire_protocol_client.get_artifacts_profile()
+ self.assertEqual(dict(), in_vm_artifacts_profile.__dict__,
+ 'If artifacts_profile_blob has empty json dictionary, in_vm_artifacts_profile '
+ 'should contain nothing')
+
+ host_plugin_get_artifact_url_and_headers.assert_called_with(testurl)
+
+
+ def test_get_in_vm_artifacts_profile_default(self, *args):
+ wire_protocol_client = WireProtocol(wireserver_url).client
+ wire_protocol_client.ext_conf = ExtensionsConfig(None)
+ wire_protocol_client.ext_conf.artifacts_profile_blob = testurl
+ goal_state = GoalState(WireProtocolData(DATA_FILE).goal_state)
+ wire_protocol_client.get_goal_state = Mock(return_value=goal_state)
+
+ wire_protocol_client.call_storage_service = Mock(return_value=MockResponse('{"onHold": "true"}'.encode('utf-8'), 200))
+ in_vm_artifacts_profile = wire_protocol_client.get_artifacts_profile()
+ self.assertEqual(dict(onHold='true'), in_vm_artifacts_profile.__dict__)
+ self.assertTrue(in_vm_artifacts_profile.is_on_hold())
+
+ @patch("time.sleep")
+ def test_fetch_manifest_fallback(self, patch_sleep, *args):
+ uri1 = ExtHandlerVersionUri()
+ uri1.uri = 'ext_uri'
+ uris = DataContractList(ExtHandlerVersionUri)
+ uris.append(uri1)
+ host_uri = 'host_uri'
+ mock_host = HostPluginProtocol(host_uri,
+ 'container_id',
+ 'role_config')
+ client = WireProtocol(wireserver_url).client
+ with patch.object(WireClient,
+ "fetch",
+ return_value=None) as patch_fetch:
+ with patch.object(WireClient,
+ "get_host_plugin",
+ return_value=mock_host):
+ with patch.object(HostPluginProtocol,
+ "get_artifact_request",
+ return_value=[host_uri, {}]):
+ self.assertRaises(ProtocolError, client.fetch_manifest, uris)
+ self.assertEqual(patch_fetch.call_count, 2)
+ self.assertEqual(patch_fetch.call_args_list[0][0][0], uri1.uri)
+ self.assertEqual(patch_fetch.call_args_list[1][0][0], host_uri)
+
+ def test_get_in_vm_artifacts_profile_host_ga_plugin(self, *args):
+ wire_protocol_client = WireProtocol(wireserver_url).client
+ wire_protocol_client.ext_conf = ExtensionsConfig(None)
+ wire_protocol_client.ext_conf.artifacts_profile_blob = testurl
+ goal_state = GoalState(WireProtocolData(DATA_FILE).goal_state)
+ wire_protocol_client.get_goal_state = Mock(return_value=goal_state)
+ wire_protocol_client.fetch = Mock(side_effect=[None, '{"onHold": "true"}'.encode('utf-8')])
+ with patch.object(HostPluginProtocol,
+ "get_artifact_request",
+ return_value=['dummy_url', {}]) as artifact_request:
+ in_vm_artifacts_profile = wire_protocol_client.get_artifacts_profile()
+ self.assertTrue(in_vm_artifacts_profile is not None)
+ self.assertEqual(dict(onHold='true'), in_vm_artifacts_profile.__dict__)
+ self.assertTrue(in_vm_artifacts_profile.is_on_hold())
+ artifact_request.assert_called_once_with(testurl)
+
+
+class MockResponse:
+ def __init__(self, body, status_code):
+ self.body = body
+ self.status = status_code
+
+ def read(self):
+ return self.body
+
if __name__ == '__main__':
unittest.main()