summaryrefslogtreecommitdiff
path: root/azurelinuxagent/common/protocol/wire.py
diff options
context:
space:
mode:
Diffstat (limited to 'azurelinuxagent/common/protocol/wire.py')
-rw-r--r--azurelinuxagent/common/protocol/wire.py112
1 files changed, 65 insertions, 47 deletions
diff --git a/azurelinuxagent/common/protocol/wire.py b/azurelinuxagent/common/protocol/wire.py
index 71c3e37..265e2dd 100644
--- a/azurelinuxagent/common/protocol/wire.py
+++ b/azurelinuxagent/common/protocol/wire.py
@@ -370,11 +370,14 @@ class StatusBlob(object):
__storage_version__ = "2014-02-14"
+ def prepare(self, blob_type):
+ logger.verbose("Prepare status blob")
+ self.data = self.to_json()
+ self.type = blob_type
+
def upload(self, url):
# TODO upload extension only if content has changed
- logger.verbose("Upload status blob")
upload_successful = False
- self.data = self.to_json()
self.type = self.get_blob_type(url)
try:
if self.type == "BlockBlob":
@@ -384,7 +387,12 @@ class StatusBlob(object):
else:
raise ProtocolError("Unknown blob type: {0}".format(self.type))
except HttpError as e:
- logger.warn("Initial upload failed [{0}]".format(e))
+ message = "Initial upload failed [{0}]".format(e)
+ logger.warn(message)
+ from azurelinuxagent.common.event import WALAEventOperation, report_event
+ report_event(op=WALAEventOperation.ReportStatus,
+ is_success=False,
+ message=message)
else:
logger.verbose("Uploading status blob succeeded")
upload_successful = True
@@ -411,48 +419,54 @@ class StatusBlob(object):
logger.verbose("Blob type: [{0}]", blob_type)
return blob_type
+ def get_block_blob_headers(self, blob_size):
+ return {
+ "Content-Length": ustr(blob_size),
+ "x-ms-blob-type": "BlockBlob",
+ "x-ms-date": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
+ "x-ms-version": self.__class__.__storage_version__
+ }
+
def put_block_blob(self, url, data):
logger.verbose("Put block blob")
- timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
- resp = self.client.call_storage_service(
- restutil.http_put,
- url,
- data,
- {
- "x-ms-date": timestamp,
- "x-ms-blob-type": "BlockBlob",
- "Content-Length": ustr(len(data)),
- "x-ms-version": self.__class__.__storage_version__
- })
+ headers = self.get_block_blob_headers(len(data))
+ resp = self.client.call_storage_service(restutil.http_put, url, data, headers)
if resp.status != httpclient.CREATED:
raise UploadError(
"Failed to upload block blob: {0}".format(resp.status))
+ def get_page_blob_create_headers(self, blob_size):
+ return {
+ "Content-Length": "0",
+ "x-ms-blob-content-length": ustr(blob_size),
+ "x-ms-blob-type": "PageBlob",
+ "x-ms-date": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
+ "x-ms-version": self.__class__.__storage_version__
+ }
+
+ def get_page_blob_page_headers(self, start, end):
+ return {
+ "Content-Length": ustr(end - start),
+ "x-ms-date": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
+ "x-ms-range": "bytes={0}-{1}".format(start, end - 1),
+ "x-ms-page-write": "update",
+ "x-ms-version": self.__class__.__storage_version__
+ }
+
def put_page_blob(self, url, data):
logger.verbose("Put page blob")
- # Convert string into bytes
+ # Convert string into bytes and align to 512 bytes
data = bytearray(data, encoding='utf-8')
- timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
-
- # Align to 512 bytes
page_blob_size = int((len(data) + 511) / 512) * 512
- resp = self.client.call_storage_service(
- restutil.http_put,
- url,
- "",
- {
- "x-ms-date": timestamp,
- "x-ms-blob-type": "PageBlob",
- "Content-Length": "0",
- "x-ms-blob-content-length": ustr(page_blob_size),
- "x-ms-version": self.__class__.__storage_version__
- })
+
+ headers = self.get_page_blob_create_headers(page_blob_size)
+ resp = self.client.call_storage_service(restutil.http_put, url, "", headers)
if resp.status != httpclient.CREATED:
raise UploadError(
"Failed to clean up page blob: {0}".format(resp.status))
- if url.count("?") < 0:
+ if url.count("?") <= 0:
url = "{0}?comp=page".format(url)
else:
url = "{0}&comp=page".format(url)
@@ -469,17 +483,12 @@ class StatusBlob(object):
buf_size = page_end - start
buf = bytearray(buf_size)
buf[0: content_size] = data[start: end]
+ headers = self.get_page_blob_page_headers(start, page_end)
resp = self.client.call_storage_service(
restutil.http_put,
url,
bytebuffer(buf),
- {
- "x-ms-date": timestamp,
- "x-ms-range": "bytes={0}-{1}".format(start, page_end - 1),
- "x-ms-page-write": "update",
- "x-ms-version": self.__class__.__storage_version__,
- "Content-Length": ustr(page_end - start)
- })
+ headers)
if resp is None or resp.status != httpclient.CREATED:
raise UploadError(
"Failed to upload page blob: {0}".format(resp.status))
@@ -634,9 +643,14 @@ class WireClient(object):
def fetch_manifest(self, version_uris):
logger.verbose("Fetch manifest")
for version in version_uris:
- response = self.fetch(version.uri)
+ response = None
+ if not HostPluginProtocol.is_default_channel():
+ response = self.fetch(version.uri)
if not response:
- logger.verbose("Manifest could not be downloaded, falling back to host plugin")
+ if HostPluginProtocol.is_default_channel():
+ logger.verbose("Using host plugin as default channel")
+ else:
+ logger.verbose("Manifest could not be downloaded, falling back to host plugin")
host = self.get_host_plugin()
uri, headers = host.get_artifact_request(version.uri)
response = self.fetch(uri, headers)
@@ -648,6 +662,9 @@ class WireClient(object):
else:
host.manifest_uri = version.uri
logger.verbose("Manifest downloaded successfully from host plugin")
+ if not HostPluginProtocol.is_default_channel():
+ logger.info("Setting host plugin as default channel")
+ HostPluginProtocol.set_default_channel(True)
if response:
return response
raise ProtocolError("Failed to fetch manifest from all sources")
@@ -663,12 +680,11 @@ class WireClient(object):
if resp.status == httpclient.OK:
return_value = self.decode_config(resp.read())
else:
- logger.warn("Could not fetch {0} [{1}: {2}]",
+ logger.warn("Could not fetch {0} [{1}]",
uri,
- resp.status,
- resp.reason)
+ HostPluginProtocol.read_response_error(resp))
except (HttpError, ProtocolError) as e:
- logger.verbose("Fetch failed from [{0}]", uri)
+ logger.verbose("Fetch failed from [{0}]: {1}", uri, e)
return return_value
def update_hosting_env(self, goal_state):
@@ -839,10 +855,12 @@ class WireClient(object):
if ext_conf.status_upload_blob is not None:
uploaded = False
try:
- uploaded = self.status_blob.upload(ext_conf.status_upload_blob)
- self.report_blob_type(self.status_blob.type,
- ext_conf.status_upload_blob_type)
- except (HttpError, ProtocolError) as e:
+ self.status_blob.prepare(ext_conf.status_upload_blob_type)
+ if not HostPluginProtocol.is_default_channel():
+ uploaded = self.status_blob.upload(ext_conf.status_upload_blob)
+ self.report_blob_type(self.status_blob.type,
+ ext_conf.status_upload_blob_type)
+ except (HttpError, ProtocolError):
# errors have already been logged
pass
if not uploaded: