diff options
Diffstat (limited to 'azurelinuxagent/common/protocol/wire.py')
-rw-r--r-- | azurelinuxagent/common/protocol/wire.py | 343 |
1 files changed, 204 insertions, 139 deletions
diff --git a/azurelinuxagent/common/protocol/wire.py b/azurelinuxagent/common/protocol/wire.py index d731e11..4f3b7e0 100644 --- a/azurelinuxagent/common/protocol/wire.py +++ b/azurelinuxagent/common/protocol/wire.py @@ -26,7 +26,8 @@ import azurelinuxagent.common.conf as conf import azurelinuxagent.common.utils.fileutil as fileutil import azurelinuxagent.common.utils.textutil as textutil -from azurelinuxagent.common.exception import ProtocolNotFoundError +from azurelinuxagent.common.exception import ProtocolNotFoundError, \ + ResourceGoneError from azurelinuxagent.common.future import httpclient, bytebuffer from azurelinuxagent.common.protocol.hostplugin import HostPluginProtocol from azurelinuxagent.common.protocol.restapi import * @@ -96,7 +97,10 @@ class WireProtocol(Protocol): cryptutil = CryptUtil(conf.get_openssl_cmd()) cryptutil.gen_transport_cert(trans_prv_file, trans_cert_file) - self.client.update_goal_state(forced=True) + self.update_goal_state(forced=True) + + def update_goal_state(self, forced=False, max_retry=3): + self.client.update_goal_state(forced=forced, max_retry=max_retry) def get_vminfo(self): goal_state = self.client.get_goal_state() @@ -117,7 +121,7 @@ class WireProtocol(Protocol): def get_vmagent_manifests(self): # Update goal state to get latest extensions config - self.client.update_goal_state() + self.update_goal_state() goal_state = self.client.get_goal_state() ext_conf = self.client.get_ext_conf() return ext_conf.vmagent_manifests, goal_state.incarnation @@ -130,7 +134,7 @@ class WireProtocol(Protocol): def get_ext_handlers(self): logger.verbose("Get extension handler config") # Update goal state to get latest extensions config - self.client.update_goal_state() + self.update_goal_state() goal_state = self.client.get_goal_state() ext_conf = self.client.get_ext_conf() # In wire protocol, incarnation is equivalent to ETag @@ -533,29 +537,27 @@ class WireClient(object): self.req_count = 0 def call_wireserver(self, http_req, *args, **kwargs): - """ - Call wire server; handle throttling (403), resource gone (410) and - service unavailable (503). - """ self.prevent_throttling() - for retry in range(0, 3): + + try: + # Never use the HTTP proxy for wireserver + kwargs['use_proxy'] = False resp = http_req(*args, **kwargs) - if resp.status == httpclient.FORBIDDEN: - logger.warn("Sending too many requests to wire server. ") - logger.info("Sleeping {0}s to avoid throttling.", - LONG_WAITING_INTERVAL) - time.sleep(LONG_WAITING_INTERVAL) - elif resp.status == httpclient.SERVICE_UNAVAILABLE: - logger.warn("Service temporarily unavailable, sleeping {0}s " - "before retrying.", LONG_WAITING_INTERVAL) - time.sleep(LONG_WAITING_INTERVAL) - elif resp.status == httpclient.GONE: - msg = args[0] if len(args) > 0 else "" - raise WireProtocolResourceGone(msg) - else: - return resp - raise ProtocolError(("Calling wire server failed: " - "{0}").format(resp.status)) + except Exception as e: + raise ProtocolError("[Wireserver Exception] {0}".format( + ustr(e))) + + if resp is not None and resp.status == httpclient.GONE: + msg = args[0] if len(args) > 0 else "" + raise WireProtocolResourceGone(msg) + + elif restutil.request_failed(resp): + msg = "[Wireserver Failed] URI {0} ".format(args[0]) + if resp is not None: + msg += " [HTTP Failed] Status Code {0}".format(resp.status) + raise ProtocolError(msg) + + return resp def decode_config(self, data): if data is None: @@ -565,16 +567,9 @@ class WireClient(object): return xml_text def fetch_config(self, uri, headers): - try: - resp = self.call_wireserver(restutil.http_get, - uri, - headers=headers) - except HttpError as e: - raise ProtocolError(ustr(e)) - - if resp.status != httpclient.OK: - raise ProtocolError("{0} - {1}".format(resp.status, uri)) - + resp = self.call_wireserver(restutil.http_get, + uri, + headers=headers) return self.decode_config(resp.read()) def fetch_cache(self, local_file): @@ -589,29 +584,17 @@ class WireClient(object): try: fileutil.write_file(local_file, data) except IOError as e: + fileutil.clean_ioerror(e, + paths=[local_file]) raise ProtocolError("Failed to write cache: {0}".format(e)) @staticmethod def call_storage_service(http_req, *args, **kwargs): - """ - Call storage service, handle SERVICE_UNAVAILABLE(503) - """ - # Default to use the configured HTTP proxy - if not 'chk_proxy' in kwargs or kwargs['chk_proxy'] is None: - kwargs['chk_proxy'] = True + if not 'use_proxy' in kwargs or kwargs['use_proxy'] is None: + kwargs['use_proxy'] = True - for retry in range(0, 3): - resp = http_req(*args, **kwargs) - if resp.status == httpclient.SERVICE_UNAVAILABLE: - logger.warn("Storage service is temporarily unavailable. ") - logger.info("Will retry in {0} seconds. ", - LONG_WAITING_INTERVAL) - time.sleep(LONG_WAITING_INTERVAL) - else: - return resp - raise ProtocolError(("Calling storage endpoint failed: " - "{0}").format(resp.status)) + return http_req(*args, **kwargs) def fetch_manifest(self, version_uris): logger.verbose("Fetch manifest") @@ -619,47 +602,61 @@ class WireClient(object): response = None if not HostPluginProtocol.is_default_channel(): response = self.fetch(version.uri) + if not response: if HostPluginProtocol.is_default_channel(): logger.verbose("Using host plugin as default channel") else: - logger.verbose("Manifest could not be downloaded, falling back to host plugin") - host = self.get_host_plugin() - uri, headers = host.get_artifact_request(version.uri) - response = self.fetch(uri, headers, chk_proxy=False) - if not response: - host = self.get_host_plugin(force_update=True) - logger.info("Retry fetch in {0} seconds", - SHORT_WAITING_INTERVAL) - time.sleep(SHORT_WAITING_INTERVAL) - else: - host.manifest_uri = version.uri - logger.verbose("Manifest downloaded successfully from host plugin") - if not HostPluginProtocol.is_default_channel(): - logger.info("Setting host plugin as default channel") - HostPluginProtocol.set_default_channel(True) + logger.verbose("Failed to download manifest, " + "switching to host plugin") + + try: + host = self.get_host_plugin() + uri, headers = host.get_artifact_request(version.uri) + response = self.fetch(uri, headers, use_proxy=False) + + # If the HostPlugin rejects the request, + # let the error continue, but set to use the HostPlugin + except ResourceGoneError: + HostPluginProtocol.set_default_channel(True) + raise + + host.manifest_uri = version.uri + logger.verbose("Manifest downloaded successfully from host plugin") + if not HostPluginProtocol.is_default_channel(): + logger.info("Setting host plugin as default channel") + HostPluginProtocol.set_default_channel(True) + if response: return response + raise ProtocolError("Failed to fetch manifest from all sources") - def fetch(self, uri, headers=None, chk_proxy=None): + def fetch(self, uri, headers=None, use_proxy=None): logger.verbose("Fetch [{0}] with headers [{1}]", uri, headers) - return_value = None try: resp = self.call_storage_service( - restutil.http_get, - uri, - headers, - chk_proxy=chk_proxy) - if resp.status == httpclient.OK: - return_value = self.decode_config(resp.read()) - else: - logger.warn("Could not fetch {0} [{1}]", - uri, - HostPluginProtocol.read_response_error(resp)) + restutil.http_get, + uri, + headers=headers, + use_proxy=use_proxy) + + if restutil.request_failed(resp): + msg = "[Storage Failed] URI {0} ".format(uri) + if resp is not None: + msg += restutil.read_response_error(resp) + logger.warn(msg) + raise ProtocolError(msg) + + return self.decode_config(resp.read()) + except (HttpError, ProtocolError) as e: logger.verbose("Fetch failed from [{0}]: {1}", uri, e) - return return_value + + if isinstance(e, ResourceGoneError): + raise + + return None def update_hosting_env(self, goal_state): if goal_state.hosting_env_uri is None: @@ -734,6 +731,12 @@ class WireClient(object): self.host_plugin.container_id = goal_state.container_id self.host_plugin.role_config_name = goal_state.role_config_name return + + except ProtocolError: + if retry < max_retry-1: + continue + raise + except WireProtocolResourceGone: logger.info("Incarnation is out of date. Update goalstate.") xml_text = self.fetch_config(uri, self.get_header()) @@ -791,20 +794,45 @@ class WireClient(object): return self.ext_conf def get_ext_manifest(self, ext_handler, goal_state): - local_file = MANIFEST_FILE_NAME.format(ext_handler.name, - goal_state.incarnation) - local_file = os.path.join(conf.get_lib_dir(), local_file) - xml_text = self.fetch_manifest(ext_handler.versionUris) - self.save_cache(local_file, xml_text) - return ExtensionManifest(xml_text) + for update_goal_state in [False, True]: + try: + if update_goal_state: + self.update_goal_state(forced=True) + goal_state = self.get_goal_state() + + local_file = MANIFEST_FILE_NAME.format( + ext_handler.name, + goal_state.incarnation) + local_file = os.path.join(conf.get_lib_dir(), local_file) + xml_text = self.fetch_manifest(ext_handler.versionUris) + self.save_cache(local_file, xml_text) + return ExtensionManifest(xml_text) + + except ResourceGoneError: + continue + + raise ProtocolError("Failed to retrieve extension manifest") def get_gafamily_manifest(self, vmagent_manifest, goal_state): - local_file = MANIFEST_FILE_NAME.format(vmagent_manifest.family, - goal_state.incarnation) - local_file = os.path.join(conf.get_lib_dir(), local_file) - xml_text = self.fetch_manifest(vmagent_manifest.versionsManifestUris) - fileutil.write_file(local_file, xml_text) - return ExtensionManifest(xml_text) + for update_goal_state in [False, True]: + try: + if update_goal_state: + self.update_goal_state(forced=True) + goal_state = self.get_goal_state() + + local_file = MANIFEST_FILE_NAME.format( + vmagent_manifest.family, + goal_state.incarnation) + local_file = os.path.join(conf.get_lib_dir(), local_file) + xml_text = self.fetch_manifest( + vmagent_manifest.versionsManifestUris) + fileutil.write_file(local_file, xml_text) + return ExtensionManifest(xml_text) + + except ResourceGoneError: + continue + + raise ProtocolError("Failed to retrieve GAFamily manifest") def check_wire_protocol_version(self): uri = VERSION_INFO_URI.format(self.endpoint) @@ -823,39 +851,55 @@ class WireClient(object): raise ProtocolNotFoundError(error) def upload_status_blob(self): - ext_conf = self.get_ext_conf() + for update_goal_state in [False, True]: + try: + if update_goal_state: + self.update_goal_state(forced=True) - blob_uri = ext_conf.status_upload_blob - blob_type = ext_conf.status_upload_blob_type + ext_conf = self.get_ext_conf() - if blob_uri is not None: + blob_uri = ext_conf.status_upload_blob + blob_type = ext_conf.status_upload_blob_type - if not blob_type in ["BlockBlob", "PageBlob"]: - blob_type = "BlockBlob" - logger.verbose("Status Blob type is unspecified " - "-- assuming it is a BlockBlob") + if blob_uri is not None: + + if not blob_type in ["BlockBlob", "PageBlob"]: + blob_type = "BlockBlob" + logger.verbose("Status Blob type is unspecified " + "-- assuming it is a BlockBlob") + + try: + self.status_blob.prepare(blob_type) + except Exception as e: + self.report_status_event( + "Exception creating status blob: {0}", ustr(e)) + return + + if not HostPluginProtocol.is_default_channel(): + try: + if self.status_blob.upload(blob_uri): + return + except HttpError as e: + pass + + host = self.get_host_plugin() + host.put_vm_status(self.status_blob, + ext_conf.status_upload_blob, + ext_conf.status_upload_blob_type) + HostPluginProtocol.set_default_channel(True) + return - try: - self.status_blob.prepare(blob_type) except Exception as e: + # If the HostPlugin rejects the request, + # let the error continue, but set to use the HostPlugin + if isinstance(e, ResourceGoneError): + HostPluginProtocol.set_default_channel(True) + continue + self.report_status_event( - "Exception creating status blob: {0}", - e) + "Exception uploading status blob: {0}", ustr(e)) return - uploaded = False - if not HostPluginProtocol.is_default_channel(): - try: - uploaded = self.status_blob.upload(blob_uri) - except HttpError as e: - pass - - if not uploaded: - host = self.get_host_plugin() - host.put_vm_status(self.status_blob, - ext_conf.status_upload_blob, - ext_conf.status_upload_blob_type) - def report_role_prop(self, thumbprint): goal_state = self.get_goal_state() role_prop = _build_role_properties(goal_state.container_id, @@ -896,11 +940,12 @@ class WireClient(object): health_report_uri, health_report, headers=headers, - max_retry=30) + max_retry=30, + retry_delay=15) except HttpError as e: raise ProtocolError((u"Failed to send provision status: " u"{0}").format(e)) - if resp.status != httpclient.OK: + if restutil.request_failed(resp): raise ProtocolError((u"Failed to send provision status: " u",{0}: {1}").format(resp.status, resp.read())) @@ -919,7 +964,7 @@ class WireClient(object): except HttpError as e: raise ProtocolError("Failed to send events:{0}".format(e)) - if resp.status != httpclient.OK: + if restutil.request_failed(resp): logger.verbose(resp.read()) raise ProtocolError( "Failed to send events:{0}".format(resp.status)) @@ -979,12 +1024,8 @@ class WireClient(object): "x-ms-guest-agent-public-x509-cert": cert } - def get_host_plugin(self, force_update=False): - if self.host_plugin is None or force_update: - if force_update: - logger.warn("Forcing update of goal state") - self.goal_state = None - self.update_goal_state(forced=True) + def get_host_plugin(self): + if self.host_plugin is None: goal_state = self.get_goal_state() self.host_plugin = HostPluginProtocol(self.endpoint, goal_state.container_id, @@ -997,23 +1038,47 @@ class WireClient(object): def get_artifacts_profile(self): artifacts_profile = None - if self.has_artifacts_profile_blob(): - blob = self.ext_conf.artifacts_profile_blob - logger.verbose("Getting the artifacts profile") - profile = self.fetch(blob) + for update_goal_state in [False, True]: + try: + if update_goal_state: + self.update_goal_state(forced=True) - if profile is None: - logger.warn("Download failed, falling back to host plugin") - host = self.get_host_plugin() - uri, headers = host.get_artifact_request(blob) - profile = self.decode_config(self.fetch(uri, headers, chk_proxy=False)) + if self.has_artifacts_profile_blob(): + blob = self.ext_conf.artifacts_profile_blob - if not textutil.is_str_none_or_whitespace(profile): - logger.verbose("Artifacts profile downloaded successfully") - artifacts_profile = InVMArtifactsProfile(profile) + profile = None + if not HostPluginProtocol.is_default_channel(): + logger.verbose("Retrieving the artifacts profile") + profile = self.fetch(blob) - return artifacts_profile + if profile is None: + if HostPluginProtocol.is_default_channel(): + logger.verbose("Using host plugin as default channel") + else: + logger.verbose("Failed to download artifacts profile, " + "switching to host plugin") + host = self.get_host_plugin() + uri, headers = host.get_artifact_request(blob) + config = self.fetch(uri, headers, use_proxy=False) + profile = self.decode_config(config) + + if not textutil.is_str_none_or_whitespace(profile): + logger.verbose("Artifacts profile downloaded") + artifacts_profile = InVMArtifactsProfile(profile) + + return artifacts_profile + + except ResourceGoneError: + HostPluginProtocol.set_default_channel(True) + continue + + except Exception as e: + logger.warn( + "Exception retrieving artifacts profile: {0}".format( + ustr(e))) + + return None class VersionInfo(object): def __init__(self, xml_text): |