summaryrefslogtreecommitdiff
path: root/azurelinuxagent/common/protocol/wire.py
diff options
context:
space:
mode:
Diffstat (limited to 'azurelinuxagent/common/protocol/wire.py')
-rw-r--r--azurelinuxagent/common/protocol/wire.py221
1 files changed, 138 insertions, 83 deletions
diff --git a/azurelinuxagent/common/protocol/wire.py b/azurelinuxagent/common/protocol/wire.py
index 4f3b7e0..963d33c 100644
--- a/azurelinuxagent/common/protocol/wire.py
+++ b/azurelinuxagent/common/protocol/wire.py
@@ -18,6 +18,7 @@
import json
import os
+import random
import re
import time
import xml.sax.saxutils as saxutils
@@ -27,7 +28,7 @@ import azurelinuxagent.common.utils.fileutil as fileutil
import azurelinuxagent.common.utils.textutil as textutil
from azurelinuxagent.common.exception import ProtocolNotFoundError, \
- ResourceGoneError
+ ResourceGoneError, RestartError
from azurelinuxagent.common.future import httpclient, bytebuffer
from azurelinuxagent.common.protocol.hostplugin import HostPluginProtocol
from azurelinuxagent.common.protocol.restapi import *
@@ -51,6 +52,7 @@ P7M_FILE_NAME = "Certificates.p7m"
PEM_FILE_NAME = "Certificates.pem"
EXT_CONF_FILE_NAME = "ExtensionsConfig.{0}.xml"
MANIFEST_FILE_NAME = "{0}.{1}.manifest.xml"
+AGENTS_MANIFEST_FILE_NAME = "{0}.{1}.agentsManifest"
TRANSPORT_CERT_FILE_NAME = "TransportCert.pem"
TRANSPORT_PRV_FILE_NAME = "TransportPrivate.pem"
@@ -58,17 +60,12 @@ PROTOCOL_VERSION = "2012-11-30"
ENDPOINT_FINE_NAME = "WireServer"
SHORT_WAITING_INTERVAL = 1 # 1 second
-LONG_WAITING_INTERVAL = 15 # 15 seconds
class UploadError(HttpError):
pass
-class WireProtocolResourceGone(ProtocolError):
- pass
-
-
class WireProtocol(Protocol):
"""Slim layer to adapt wire protocol data to metadata protocol interface"""
@@ -119,6 +116,13 @@ class WireProtocol(Protocol):
certificates = self.client.get_certs()
return certificates.cert_list
+ def get_incarnation(self):
+ path = os.path.join(conf.get_lib_dir(), INCARNATION_FILE_NAME)
+ if os.path.exists(path):
+ return fileutil.read_file(path)
+ else:
+ return 0
+
def get_vmagent_manifests(self):
# Update goal state to get latest extensions config
self.update_goal_state()
@@ -128,8 +132,9 @@ class WireProtocol(Protocol):
def get_vmagent_pkgs(self, vmagent_manifest):
goal_state = self.client.get_goal_state()
- man = self.client.get_gafamily_manifest(vmagent_manifest, goal_state)
- return man.pkg_list
+ ga_manifest = self.client.get_gafamily_manifest(vmagent_manifest, goal_state)
+ valid_pkg_list = self.client.filter_package_list(vmagent_manifest.family, ga_manifest, goal_state)
+ return valid_pkg_list
def get_ext_handlers(self):
logger.verbose("Get extension handler config")
@@ -140,10 +145,9 @@ class WireProtocol(Protocol):
# In wire protocol, incarnation is equivalent to ETag
return ext_conf.ext_handlers, goal_state.incarnation
- def get_ext_handler_pkgs(self, ext_handler):
+ def get_ext_handler_pkgs(self, ext_handler, etag):
logger.verbose("Get extension handler package")
- goal_state = self.client.get_goal_state()
- man = self.client.get_ext_manifest(ext_handler, goal_state)
+ man = self.client.get_ext_manifest(ext_handler, etag)
return man.pkg_list
def get_artifacts_profile(self):
@@ -156,10 +160,10 @@ class WireProtocol(Protocol):
if package is not None:
return package
else:
- logger.warn("Download did not succeed, falling back to host plugin")
+ logger.verbose("Download did not succeed, falling back to host plugin")
host = self.client.get_host_plugin()
uri, headers = host.get_artifact_request(uri, host.manifest_uri)
- package = super(WireProtocol, self).download_ext_handler_pkg(uri, headers=headers)
+ package = super(WireProtocol, self).download_ext_handler_pkg(uri, headers=headers, use_proxy=False)
return package
def report_provision_status(self, provision_status):
@@ -248,22 +252,28 @@ Convert VMStatus object to status blob format
"""
+def ga_status_to_guest_info(ga_status):
+ v1_ga_guest_info = {
+ "computerName" : ga_status.hostname,
+ "osName" : ga_status.osname,
+ "osVersion" : ga_status.osversion,
+ "version" : ga_status.version,
+ }
+ return v1_ga_guest_info
+
+
def ga_status_to_v1(ga_status):
formatted_msg = {
'lang': 'en-US',
'message': ga_status.message
}
v1_ga_status = {
- 'version': ga_status.version,
- 'status': ga_status.status,
- 'osversion': ga_status.osversion,
- 'osname': ga_status.osname,
- 'hostname': ga_status.hostname,
- 'formattedMessage': formatted_msg
+ "version" : ga_status.version,
+ "status" : ga_status.status,
+ "formattedMessage" : formatted_msg
}
return v1_ga_status
-
def ext_substatus_to_v1(sub_status_list):
status_list = []
for substatus in sub_status_list:
@@ -318,6 +328,9 @@ def ext_handler_status_to_v1(handler_status, ext_statuses, timestamp):
"message": handler_status.message
}
+ if handler_status.upgradeGuid is not None:
+ v1_handler_status["upgradeGuid"] = handler_status.upgradeGuid
+
if len(handler_status.extensions) > 0:
# Currently, no more than one extension per handler
ext_name = handler_status.extensions[0]
@@ -334,6 +347,7 @@ def ext_handler_status_to_v1(handler_status, ext_statuses, timestamp):
def vm_status_to_v1(vm_status, ext_statuses):
timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
+ v1_ga_guest_info = ga_status_to_guest_info(vm_status.vmAgent)
v1_ga_status = ga_status_to_v1(vm_status.vmAgent)
v1_handler_status_list = []
for handler_status in vm_status.vmAgent.extensionHandlers:
@@ -349,7 +363,8 @@ def vm_status_to_v1(vm_status, ext_statuses):
v1_vm_status = {
'version': '1.1',
'timestampUTC': timestamp,
- 'aggregateStatus': v1_agg_status
+ 'aggregateStatus': v1_agg_status,
+ 'guestOSInfo' : v1_ga_guest_info
}
return v1_vm_status
@@ -512,51 +527,29 @@ class WireClient(object):
self.shared_conf = None
self.certs = None
self.ext_conf = None
- self.last_request = 0
- self.req_count = 0
self.host_plugin = None
self.status_blob = StatusBlob(self)
- def prevent_throttling(self):
- """
- Try to avoid throttling of wire server
- """
- now = time.time()
- if now - self.last_request < 1:
- logger.verbose("Last request issued less than 1 second ago")
- logger.verbose("Sleep {0} second to avoid throttling.",
- SHORT_WAITING_INTERVAL)
- time.sleep(SHORT_WAITING_INTERVAL)
- self.last_request = now
-
- self.req_count += 1
- if self.req_count % 3 == 0:
- logger.verbose("Sleep {0} second to avoid throttling.",
- SHORT_WAITING_INTERVAL)
- time.sleep(SHORT_WAITING_INTERVAL)
- self.req_count = 0
-
def call_wireserver(self, http_req, *args, **kwargs):
- self.prevent_throttling()
-
try:
# Never use the HTTP proxy for wireserver
kwargs['use_proxy'] = False
resp = http_req(*args, **kwargs)
+
+ if restutil.request_failed(resp):
+ msg = "[Wireserver Failed] URI {0} ".format(args[0])
+ if resp is not None:
+ msg += " [HTTP Failed] Status Code {0}".format(resp.status)
+ raise ProtocolError(msg)
+
+ # If the GoalState is stale, pass along the exception to the caller
+ except ResourceGoneError:
+ raise
+
except Exception as e:
raise ProtocolError("[Wireserver Exception] {0}".format(
ustr(e)))
- if resp is not None and resp.status == httpclient.GONE:
- msg = args[0] if len(args) > 0 else ""
- raise WireProtocolResourceGone(msg)
-
- elif restutil.request_failed(resp):
- msg = "[Wireserver Failed] URI {0} ".format(args[0])
- if resp is not None:
- msg += " [HTTP Failed] Status Code {0}".format(resp.status)
- raise ProtocolError(msg)
-
return resp
def decode_config(self, data):
@@ -598,7 +591,10 @@ class WireClient(object):
def fetch_manifest(self, version_uris):
logger.verbose("Fetch manifest")
- for version in version_uris:
+ version_uris_shuffled = version_uris
+ random.shuffle(version_uris_shuffled)
+
+ for version in version_uris_shuffled:
response = None
if not HostPluginProtocol.is_default_channel():
response = self.fetch(version.uri)
@@ -698,26 +694,28 @@ class WireClient(object):
self.ext_conf = ExtensionsConfig(xml_text)
def update_goal_state(self, forced=False, max_retry=3):
- uri = GOAL_STATE_URI.format(self.endpoint)
- xml_text = self.fetch_config(uri, self.get_header())
- goal_state = GoalState(xml_text)
-
incarnation_file = os.path.join(conf.get_lib_dir(),
INCARNATION_FILE_NAME)
+ uri = GOAL_STATE_URI.format(self.endpoint)
- if not forced:
- last_incarnation = None
- if os.path.isfile(incarnation_file):
- last_incarnation = fileutil.read_file(incarnation_file)
- new_incarnation = goal_state.incarnation
- if last_incarnation is not None and \
- last_incarnation == new_incarnation:
- # Goalstate is not updated.
- return
-
- # Start updating goalstate, retry on 410
+ goal_state = None
for retry in range(0, max_retry):
try:
+ if goal_state is None:
+ xml_text = self.fetch_config(uri, self.get_header())
+ goal_state = GoalState(xml_text)
+
+ if not forced:
+ last_incarnation = None
+ if os.path.isfile(incarnation_file):
+ last_incarnation = fileutil.read_file(
+ incarnation_file)
+ new_incarnation = goal_state.incarnation
+ if last_incarnation is not None and \
+ last_incarnation == new_incarnation:
+ # Goalstate is not updated.
+ return
+
self.goal_state = goal_state
file_name = GOAL_STATE_FILE_NAME.format(goal_state.incarnation)
goal_state_file = os.path.join(conf.get_lib_dir(), file_name)
@@ -727,21 +725,29 @@ class WireClient(object):
self.update_certs(goal_state)
self.update_ext_conf(goal_state)
self.save_cache(incarnation_file, goal_state.incarnation)
+
if self.host_plugin is not None:
self.host_plugin.container_id = goal_state.container_id
self.host_plugin.role_config_name = goal_state.role_config_name
+
return
- except ProtocolError:
+ except ResourceGoneError:
+ logger.info("GoalState is stale -- re-fetching")
+ goal_state = None
+
+ except Exception as e:
+ log_method = logger.info \
+ if type(e) is ProtocolError \
+ else logger.warn
+ log_method(
+ "Exception processing GoalState-related files: {0}".format(
+ ustr(e)))
+
if retry < max_retry-1:
continue
raise
- except WireProtocolResourceGone:
- logger.info("Incarnation is out of date. Update goalstate.")
- xml_text = self.fetch_config(uri, self.get_header())
- goal_state = GoalState(xml_text)
-
raise ProtocolError("Exceeded max retry updating goal state")
def get_goal_state(self):
@@ -793,25 +799,70 @@ class WireClient(object):
self.ext_conf = ExtensionsConfig(xml_text)
return self.ext_conf
- def get_ext_manifest(self, ext_handler, goal_state):
+ def get_ext_manifest(self, ext_handler, incarnation):
+
for update_goal_state in [False, True]:
try:
if update_goal_state:
self.update_goal_state(forced=True)
- goal_state = self.get_goal_state()
+ incarnation = self.get_goal_state().incarnation
local_file = MANIFEST_FILE_NAME.format(
ext_handler.name,
- goal_state.incarnation)
+ incarnation)
local_file = os.path.join(conf.get_lib_dir(), local_file)
- xml_text = self.fetch_manifest(ext_handler.versionUris)
- self.save_cache(local_file, xml_text)
+
+ xml_text = None
+ if not update_goal_state:
+ try:
+ xml_text = self.fetch_cache(local_file)
+ except ProtocolError:
+ pass
+
+ if xml_text is None:
+ xml_text = self.fetch_manifest(ext_handler.versionUris)
+ self.save_cache(local_file, xml_text)
return ExtensionManifest(xml_text)
except ResourceGoneError:
continue
- raise ProtocolError("Failed to retrieve extension manifest")
+ raise RestartError("Failed to retrieve extension manifest")
+
+ def filter_package_list(self, family, ga_manifest, goal_state):
+ complete_list = ga_manifest.pkg_list
+ agent_manifest = os.path.join(conf.get_lib_dir(),
+ AGENTS_MANIFEST_FILE_NAME.format(
+ family,
+ goal_state.incarnation))
+
+ if not os.path.exists(agent_manifest):
+ # clear memory cache
+ ga_manifest.allowed_versions = None
+
+ # create disk cache
+ with open(agent_manifest, mode='w') as manifest_fh:
+ for version in complete_list.versions:
+ manifest_fh.write('{0}\n'.format(version.version))
+ fileutil.chmod(agent_manifest, 0o644)
+
+ return complete_list
+
+ else:
+ # use allowed versions from cache, otherwise from disk
+ if ga_manifest.allowed_versions is None:
+ with open(agent_manifest, mode='r') as manifest_fh:
+ ga_manifest.allowed_versions = [v.strip('\n') for v
+ in manifest_fh.readlines()]
+
+ # use the updated manifest urls for allowed versions
+ allowed_list = ExtHandlerPackageList()
+ allowed_list.versions = [version for version
+ in complete_list.versions
+ if version.version
+ in ga_manifest.allowed_versions]
+
+ return allowed_list
def get_gafamily_manifest(self, vmagent_manifest, goal_state):
for update_goal_state in [False, True]:
@@ -844,7 +895,7 @@ class WireClient(object):
logger.info("Wire protocol version:{0}", PROTOCOL_VERSION)
elif PROTOCOL_VERSION in version_info.get_supported():
logger.info("Wire protocol version:{0}", PROTOCOL_VERSION)
- logger.warn("Server preferred version:{0}", preferred)
+ logger.info("Server preferred version:{0}", preferred)
else:
error = ("Agent supported wire protocol version: {0} was not "
"advised by Fabric.").format(PROTOCOL_VERSION)
@@ -1360,6 +1411,9 @@ class ExtensionsConfig(object):
ext_handler.properties.version = getattrib(plugin, "version")
ext_handler.properties.state = getattrib(plugin, "state")
+ ext_handler.properties.upgradeGuid = getattrib(plugin, "upgradeGuid")
+ if not ext_handler.properties.upgradeGuid:
+ ext_handler.properties.upgradeGuid = None
auto_upgrade = getattrib(plugin, "autoUpgrade")
if auto_upgrade is not None and auto_upgrade.lower() == "true":
ext_handler.properties.upgradePolicy = "auto"
@@ -1418,6 +1472,7 @@ class ExtensionManifest(object):
raise ValueError("ExtensionManifest is None")
logger.verbose("Load ExtensionManifest.xml")
self.pkg_list = ExtHandlerPackageList()
+ self.allowed_versions = None
self.parse(xml_text)
def parse(self, xml_text):