summaryrefslogtreecommitdiff
path: root/azurelinuxagent/common/protocol/wire.py
diff options
context:
space:
mode:
Diffstat (limited to 'azurelinuxagent/common/protocol/wire.py')
-rw-r--r--azurelinuxagent/common/protocol/wire.py343
1 files changed, 204 insertions, 139 deletions
diff --git a/azurelinuxagent/common/protocol/wire.py b/azurelinuxagent/common/protocol/wire.py
index d731e11..4f3b7e0 100644
--- a/azurelinuxagent/common/protocol/wire.py
+++ b/azurelinuxagent/common/protocol/wire.py
@@ -26,7 +26,8 @@ import azurelinuxagent.common.conf as conf
import azurelinuxagent.common.utils.fileutil as fileutil
import azurelinuxagent.common.utils.textutil as textutil
-from azurelinuxagent.common.exception import ProtocolNotFoundError
+from azurelinuxagent.common.exception import ProtocolNotFoundError, \
+ ResourceGoneError
from azurelinuxagent.common.future import httpclient, bytebuffer
from azurelinuxagent.common.protocol.hostplugin import HostPluginProtocol
from azurelinuxagent.common.protocol.restapi import *
@@ -96,7 +97,10 @@ class WireProtocol(Protocol):
cryptutil = CryptUtil(conf.get_openssl_cmd())
cryptutil.gen_transport_cert(trans_prv_file, trans_cert_file)
- self.client.update_goal_state(forced=True)
+ self.update_goal_state(forced=True)
+
+ def update_goal_state(self, forced=False, max_retry=3):
+ self.client.update_goal_state(forced=forced, max_retry=max_retry)
def get_vminfo(self):
goal_state = self.client.get_goal_state()
@@ -117,7 +121,7 @@ class WireProtocol(Protocol):
def get_vmagent_manifests(self):
# Update goal state to get latest extensions config
- self.client.update_goal_state()
+ self.update_goal_state()
goal_state = self.client.get_goal_state()
ext_conf = self.client.get_ext_conf()
return ext_conf.vmagent_manifests, goal_state.incarnation
@@ -130,7 +134,7 @@ class WireProtocol(Protocol):
def get_ext_handlers(self):
logger.verbose("Get extension handler config")
# Update goal state to get latest extensions config
- self.client.update_goal_state()
+ self.update_goal_state()
goal_state = self.client.get_goal_state()
ext_conf = self.client.get_ext_conf()
# In wire protocol, incarnation is equivalent to ETag
@@ -533,29 +537,27 @@ class WireClient(object):
self.req_count = 0
def call_wireserver(self, http_req, *args, **kwargs):
- """
- Call wire server; handle throttling (403), resource gone (410) and
- service unavailable (503).
- """
self.prevent_throttling()
- for retry in range(0, 3):
+
+ try:
+ # Never use the HTTP proxy for wireserver
+ kwargs['use_proxy'] = False
resp = http_req(*args, **kwargs)
- if resp.status == httpclient.FORBIDDEN:
- logger.warn("Sending too many requests to wire server. ")
- logger.info("Sleeping {0}s to avoid throttling.",
- LONG_WAITING_INTERVAL)
- time.sleep(LONG_WAITING_INTERVAL)
- elif resp.status == httpclient.SERVICE_UNAVAILABLE:
- logger.warn("Service temporarily unavailable, sleeping {0}s "
- "before retrying.", LONG_WAITING_INTERVAL)
- time.sleep(LONG_WAITING_INTERVAL)
- elif resp.status == httpclient.GONE:
- msg = args[0] if len(args) > 0 else ""
- raise WireProtocolResourceGone(msg)
- else:
- return resp
- raise ProtocolError(("Calling wire server failed: "
- "{0}").format(resp.status))
+ except Exception as e:
+ raise ProtocolError("[Wireserver Exception] {0}".format(
+ ustr(e)))
+
+ if resp is not None and resp.status == httpclient.GONE:
+ msg = args[0] if len(args) > 0 else ""
+ raise WireProtocolResourceGone(msg)
+
+ elif restutil.request_failed(resp):
+ msg = "[Wireserver Failed] URI {0} ".format(args[0])
+ if resp is not None:
+ msg += " [HTTP Failed] Status Code {0}".format(resp.status)
+ raise ProtocolError(msg)
+
+ return resp
def decode_config(self, data):
if data is None:
@@ -565,16 +567,9 @@ class WireClient(object):
return xml_text
def fetch_config(self, uri, headers):
- try:
- resp = self.call_wireserver(restutil.http_get,
- uri,
- headers=headers)
- except HttpError as e:
- raise ProtocolError(ustr(e))
-
- if resp.status != httpclient.OK:
- raise ProtocolError("{0} - {1}".format(resp.status, uri))
-
+ resp = self.call_wireserver(restutil.http_get,
+ uri,
+ headers=headers)
return self.decode_config(resp.read())
def fetch_cache(self, local_file):
@@ -589,29 +584,17 @@ class WireClient(object):
try:
fileutil.write_file(local_file, data)
except IOError as e:
+ fileutil.clean_ioerror(e,
+ paths=[local_file])
raise ProtocolError("Failed to write cache: {0}".format(e))
@staticmethod
def call_storage_service(http_req, *args, **kwargs):
- """
- Call storage service, handle SERVICE_UNAVAILABLE(503)
- """
-
# Default to use the configured HTTP proxy
- if not 'chk_proxy' in kwargs or kwargs['chk_proxy'] is None:
- kwargs['chk_proxy'] = True
+ if not 'use_proxy' in kwargs or kwargs['use_proxy'] is None:
+ kwargs['use_proxy'] = True
- for retry in range(0, 3):
- resp = http_req(*args, **kwargs)
- if resp.status == httpclient.SERVICE_UNAVAILABLE:
- logger.warn("Storage service is temporarily unavailable. ")
- logger.info("Will retry in {0} seconds. ",
- LONG_WAITING_INTERVAL)
- time.sleep(LONG_WAITING_INTERVAL)
- else:
- return resp
- raise ProtocolError(("Calling storage endpoint failed: "
- "{0}").format(resp.status))
+ return http_req(*args, **kwargs)
def fetch_manifest(self, version_uris):
logger.verbose("Fetch manifest")
@@ -619,47 +602,61 @@ class WireClient(object):
response = None
if not HostPluginProtocol.is_default_channel():
response = self.fetch(version.uri)
+
if not response:
if HostPluginProtocol.is_default_channel():
logger.verbose("Using host plugin as default channel")
else:
- logger.verbose("Manifest could not be downloaded, falling back to host plugin")
- host = self.get_host_plugin()
- uri, headers = host.get_artifact_request(version.uri)
- response = self.fetch(uri, headers, chk_proxy=False)
- if not response:
- host = self.get_host_plugin(force_update=True)
- logger.info("Retry fetch in {0} seconds",
- SHORT_WAITING_INTERVAL)
- time.sleep(SHORT_WAITING_INTERVAL)
- else:
- host.manifest_uri = version.uri
- logger.verbose("Manifest downloaded successfully from host plugin")
- if not HostPluginProtocol.is_default_channel():
- logger.info("Setting host plugin as default channel")
- HostPluginProtocol.set_default_channel(True)
+ logger.verbose("Failed to download manifest, "
+ "switching to host plugin")
+
+ try:
+ host = self.get_host_plugin()
+ uri, headers = host.get_artifact_request(version.uri)
+ response = self.fetch(uri, headers, use_proxy=False)
+
+ # If the HostPlugin rejects the request,
+ # let the error continue, but set to use the HostPlugin
+ except ResourceGoneError:
+ HostPluginProtocol.set_default_channel(True)
+ raise
+
+ host.manifest_uri = version.uri
+ logger.verbose("Manifest downloaded successfully from host plugin")
+ if not HostPluginProtocol.is_default_channel():
+ logger.info("Setting host plugin as default channel")
+ HostPluginProtocol.set_default_channel(True)
+
if response:
return response
+
raise ProtocolError("Failed to fetch manifest from all sources")
- def fetch(self, uri, headers=None, chk_proxy=None):
+ def fetch(self, uri, headers=None, use_proxy=None):
logger.verbose("Fetch [{0}] with headers [{1}]", uri, headers)
- return_value = None
try:
resp = self.call_storage_service(
- restutil.http_get,
- uri,
- headers,
- chk_proxy=chk_proxy)
- if resp.status == httpclient.OK:
- return_value = self.decode_config(resp.read())
- else:
- logger.warn("Could not fetch {0} [{1}]",
- uri,
- HostPluginProtocol.read_response_error(resp))
+ restutil.http_get,
+ uri,
+ headers=headers,
+ use_proxy=use_proxy)
+
+ if restutil.request_failed(resp):
+ msg = "[Storage Failed] URI {0} ".format(uri)
+ if resp is not None:
+ msg += restutil.read_response_error(resp)
+ logger.warn(msg)
+ raise ProtocolError(msg)
+
+ return self.decode_config(resp.read())
+
except (HttpError, ProtocolError) as e:
logger.verbose("Fetch failed from [{0}]: {1}", uri, e)
- return return_value
+
+ if isinstance(e, ResourceGoneError):
+ raise
+
+ return None
def update_hosting_env(self, goal_state):
if goal_state.hosting_env_uri is None:
@@ -734,6 +731,12 @@ class WireClient(object):
self.host_plugin.container_id = goal_state.container_id
self.host_plugin.role_config_name = goal_state.role_config_name
return
+
+ except ProtocolError:
+ if retry < max_retry-1:
+ continue
+ raise
+
except WireProtocolResourceGone:
logger.info("Incarnation is out of date. Update goalstate.")
xml_text = self.fetch_config(uri, self.get_header())
@@ -791,20 +794,45 @@ class WireClient(object):
return self.ext_conf
def get_ext_manifest(self, ext_handler, goal_state):
- local_file = MANIFEST_FILE_NAME.format(ext_handler.name,
- goal_state.incarnation)
- local_file = os.path.join(conf.get_lib_dir(), local_file)
- xml_text = self.fetch_manifest(ext_handler.versionUris)
- self.save_cache(local_file, xml_text)
- return ExtensionManifest(xml_text)
+ for update_goal_state in [False, True]:
+ try:
+ if update_goal_state:
+ self.update_goal_state(forced=True)
+ goal_state = self.get_goal_state()
+
+ local_file = MANIFEST_FILE_NAME.format(
+ ext_handler.name,
+ goal_state.incarnation)
+ local_file = os.path.join(conf.get_lib_dir(), local_file)
+ xml_text = self.fetch_manifest(ext_handler.versionUris)
+ self.save_cache(local_file, xml_text)
+ return ExtensionManifest(xml_text)
+
+ except ResourceGoneError:
+ continue
+
+ raise ProtocolError("Failed to retrieve extension manifest")
def get_gafamily_manifest(self, vmagent_manifest, goal_state):
- local_file = MANIFEST_FILE_NAME.format(vmagent_manifest.family,
- goal_state.incarnation)
- local_file = os.path.join(conf.get_lib_dir(), local_file)
- xml_text = self.fetch_manifest(vmagent_manifest.versionsManifestUris)
- fileutil.write_file(local_file, xml_text)
- return ExtensionManifest(xml_text)
+ for update_goal_state in [False, True]:
+ try:
+ if update_goal_state:
+ self.update_goal_state(forced=True)
+ goal_state = self.get_goal_state()
+
+ local_file = MANIFEST_FILE_NAME.format(
+ vmagent_manifest.family,
+ goal_state.incarnation)
+ local_file = os.path.join(conf.get_lib_dir(), local_file)
+ xml_text = self.fetch_manifest(
+ vmagent_manifest.versionsManifestUris)
+ fileutil.write_file(local_file, xml_text)
+ return ExtensionManifest(xml_text)
+
+ except ResourceGoneError:
+ continue
+
+ raise ProtocolError("Failed to retrieve GAFamily manifest")
def check_wire_protocol_version(self):
uri = VERSION_INFO_URI.format(self.endpoint)
@@ -823,39 +851,55 @@ class WireClient(object):
raise ProtocolNotFoundError(error)
def upload_status_blob(self):
- ext_conf = self.get_ext_conf()
+ for update_goal_state in [False, True]:
+ try:
+ if update_goal_state:
+ self.update_goal_state(forced=True)
- blob_uri = ext_conf.status_upload_blob
- blob_type = ext_conf.status_upload_blob_type
+ ext_conf = self.get_ext_conf()
- if blob_uri is not None:
+ blob_uri = ext_conf.status_upload_blob
+ blob_type = ext_conf.status_upload_blob_type
- if not blob_type in ["BlockBlob", "PageBlob"]:
- blob_type = "BlockBlob"
- logger.verbose("Status Blob type is unspecified "
- "-- assuming it is a BlockBlob")
+ if blob_uri is not None:
+
+ if not blob_type in ["BlockBlob", "PageBlob"]:
+ blob_type = "BlockBlob"
+ logger.verbose("Status Blob type is unspecified "
+ "-- assuming it is a BlockBlob")
+
+ try:
+ self.status_blob.prepare(blob_type)
+ except Exception as e:
+ self.report_status_event(
+ "Exception creating status blob: {0}", ustr(e))
+ return
+
+ if not HostPluginProtocol.is_default_channel():
+ try:
+ if self.status_blob.upload(blob_uri):
+ return
+ except HttpError as e:
+ pass
+
+ host = self.get_host_plugin()
+ host.put_vm_status(self.status_blob,
+ ext_conf.status_upload_blob,
+ ext_conf.status_upload_blob_type)
+ HostPluginProtocol.set_default_channel(True)
+ return
- try:
- self.status_blob.prepare(blob_type)
except Exception as e:
+ # If the HostPlugin rejects the request,
+ # let the error continue, but set to use the HostPlugin
+ if isinstance(e, ResourceGoneError):
+ HostPluginProtocol.set_default_channel(True)
+ continue
+
self.report_status_event(
- "Exception creating status blob: {0}",
- e)
+ "Exception uploading status blob: {0}", ustr(e))
return
- uploaded = False
- if not HostPluginProtocol.is_default_channel():
- try:
- uploaded = self.status_blob.upload(blob_uri)
- except HttpError as e:
- pass
-
- if not uploaded:
- host = self.get_host_plugin()
- host.put_vm_status(self.status_blob,
- ext_conf.status_upload_blob,
- ext_conf.status_upload_blob_type)
-
def report_role_prop(self, thumbprint):
goal_state = self.get_goal_state()
role_prop = _build_role_properties(goal_state.container_id,
@@ -896,11 +940,12 @@ class WireClient(object):
health_report_uri,
health_report,
headers=headers,
- max_retry=30)
+ max_retry=30,
+ retry_delay=15)
except HttpError as e:
raise ProtocolError((u"Failed to send provision status: "
u"{0}").format(e))
- if resp.status != httpclient.OK:
+ if restutil.request_failed(resp):
raise ProtocolError((u"Failed to send provision status: "
u",{0}: {1}").format(resp.status,
resp.read()))
@@ -919,7 +964,7 @@ class WireClient(object):
except HttpError as e:
raise ProtocolError("Failed to send events:{0}".format(e))
- if resp.status != httpclient.OK:
+ if restutil.request_failed(resp):
logger.verbose(resp.read())
raise ProtocolError(
"Failed to send events:{0}".format(resp.status))
@@ -979,12 +1024,8 @@ class WireClient(object):
"x-ms-guest-agent-public-x509-cert": cert
}
- def get_host_plugin(self, force_update=False):
- if self.host_plugin is None or force_update:
- if force_update:
- logger.warn("Forcing update of goal state")
- self.goal_state = None
- self.update_goal_state(forced=True)
+ def get_host_plugin(self):
+ if self.host_plugin is None:
goal_state = self.get_goal_state()
self.host_plugin = HostPluginProtocol(self.endpoint,
goal_state.container_id,
@@ -997,23 +1038,47 @@ class WireClient(object):
def get_artifacts_profile(self):
artifacts_profile = None
- if self.has_artifacts_profile_blob():
- blob = self.ext_conf.artifacts_profile_blob
- logger.verbose("Getting the artifacts profile")
- profile = self.fetch(blob)
+ for update_goal_state in [False, True]:
+ try:
+ if update_goal_state:
+ self.update_goal_state(forced=True)
- if profile is None:
- logger.warn("Download failed, falling back to host plugin")
- host = self.get_host_plugin()
- uri, headers = host.get_artifact_request(blob)
- profile = self.decode_config(self.fetch(uri, headers, chk_proxy=False))
+ if self.has_artifacts_profile_blob():
+ blob = self.ext_conf.artifacts_profile_blob
- if not textutil.is_str_none_or_whitespace(profile):
- logger.verbose("Artifacts profile downloaded successfully")
- artifacts_profile = InVMArtifactsProfile(profile)
+ profile = None
+ if not HostPluginProtocol.is_default_channel():
+ logger.verbose("Retrieving the artifacts profile")
+ profile = self.fetch(blob)
- return artifacts_profile
+ if profile is None:
+ if HostPluginProtocol.is_default_channel():
+ logger.verbose("Using host plugin as default channel")
+ else:
+ logger.verbose("Failed to download artifacts profile, "
+ "switching to host plugin")
+ host = self.get_host_plugin()
+ uri, headers = host.get_artifact_request(blob)
+ config = self.fetch(uri, headers, use_proxy=False)
+ profile = self.decode_config(config)
+
+ if not textutil.is_str_none_or_whitespace(profile):
+ logger.verbose("Artifacts profile downloaded")
+ artifacts_profile = InVMArtifactsProfile(profile)
+
+ return artifacts_profile
+
+ except ResourceGoneError:
+ HostPluginProtocol.set_default_channel(True)
+ continue
+
+ except Exception as e:
+ logger.warn(
+ "Exception retrieving artifacts profile: {0}".format(
+ ustr(e)))
+
+ return None
class VersionInfo(object):
def __init__(self, xml_text):