diff options
Diffstat (limited to 'azurelinuxagent/common/protocol/wire.py')
-rw-r--r-- | azurelinuxagent/common/protocol/wire.py | 221 |
1 files changed, 138 insertions, 83 deletions
diff --git a/azurelinuxagent/common/protocol/wire.py b/azurelinuxagent/common/protocol/wire.py index 4f3b7e0..963d33c 100644 --- a/azurelinuxagent/common/protocol/wire.py +++ b/azurelinuxagent/common/protocol/wire.py @@ -18,6 +18,7 @@ import json import os +import random import re import time import xml.sax.saxutils as saxutils @@ -27,7 +28,7 @@ import azurelinuxagent.common.utils.fileutil as fileutil import azurelinuxagent.common.utils.textutil as textutil from azurelinuxagent.common.exception import ProtocolNotFoundError, \ - ResourceGoneError + ResourceGoneError, RestartError from azurelinuxagent.common.future import httpclient, bytebuffer from azurelinuxagent.common.protocol.hostplugin import HostPluginProtocol from azurelinuxagent.common.protocol.restapi import * @@ -51,6 +52,7 @@ P7M_FILE_NAME = "Certificates.p7m" PEM_FILE_NAME = "Certificates.pem" EXT_CONF_FILE_NAME = "ExtensionsConfig.{0}.xml" MANIFEST_FILE_NAME = "{0}.{1}.manifest.xml" +AGENTS_MANIFEST_FILE_NAME = "{0}.{1}.agentsManifest" TRANSPORT_CERT_FILE_NAME = "TransportCert.pem" TRANSPORT_PRV_FILE_NAME = "TransportPrivate.pem" @@ -58,17 +60,12 @@ PROTOCOL_VERSION = "2012-11-30" ENDPOINT_FINE_NAME = "WireServer" SHORT_WAITING_INTERVAL = 1 # 1 second -LONG_WAITING_INTERVAL = 15 # 15 seconds class UploadError(HttpError): pass -class WireProtocolResourceGone(ProtocolError): - pass - - class WireProtocol(Protocol): """Slim layer to adapt wire protocol data to metadata protocol interface""" @@ -119,6 +116,13 @@ class WireProtocol(Protocol): certificates = self.client.get_certs() return certificates.cert_list + def get_incarnation(self): + path = os.path.join(conf.get_lib_dir(), INCARNATION_FILE_NAME) + if os.path.exists(path): + return fileutil.read_file(path) + else: + return 0 + def get_vmagent_manifests(self): # Update goal state to get latest extensions config self.update_goal_state() @@ -128,8 +132,9 @@ class WireProtocol(Protocol): def get_vmagent_pkgs(self, vmagent_manifest): goal_state = self.client.get_goal_state() - man = self.client.get_gafamily_manifest(vmagent_manifest, goal_state) - return man.pkg_list + ga_manifest = self.client.get_gafamily_manifest(vmagent_manifest, goal_state) + valid_pkg_list = self.client.filter_package_list(vmagent_manifest.family, ga_manifest, goal_state) + return valid_pkg_list def get_ext_handlers(self): logger.verbose("Get extension handler config") @@ -140,10 +145,9 @@ class WireProtocol(Protocol): # In wire protocol, incarnation is equivalent to ETag return ext_conf.ext_handlers, goal_state.incarnation - def get_ext_handler_pkgs(self, ext_handler): + def get_ext_handler_pkgs(self, ext_handler, etag): logger.verbose("Get extension handler package") - goal_state = self.client.get_goal_state() - man = self.client.get_ext_manifest(ext_handler, goal_state) + man = self.client.get_ext_manifest(ext_handler, etag) return man.pkg_list def get_artifacts_profile(self): @@ -156,10 +160,10 @@ class WireProtocol(Protocol): if package is not None: return package else: - logger.warn("Download did not succeed, falling back to host plugin") + logger.verbose("Download did not succeed, falling back to host plugin") host = self.client.get_host_plugin() uri, headers = host.get_artifact_request(uri, host.manifest_uri) - package = super(WireProtocol, self).download_ext_handler_pkg(uri, headers=headers) + package = super(WireProtocol, self).download_ext_handler_pkg(uri, headers=headers, use_proxy=False) return package def report_provision_status(self, provision_status): @@ -248,22 +252,28 @@ Convert VMStatus object to status blob format """ +def ga_status_to_guest_info(ga_status): + v1_ga_guest_info = { + "computerName" : ga_status.hostname, + "osName" : ga_status.osname, + "osVersion" : ga_status.osversion, + "version" : ga_status.version, + } + return v1_ga_guest_info + + def ga_status_to_v1(ga_status): formatted_msg = { 'lang': 'en-US', 'message': ga_status.message } v1_ga_status = { - 'version': ga_status.version, - 'status': ga_status.status, - 'osversion': ga_status.osversion, - 'osname': ga_status.osname, - 'hostname': ga_status.hostname, - 'formattedMessage': formatted_msg + "version" : ga_status.version, + "status" : ga_status.status, + "formattedMessage" : formatted_msg } return v1_ga_status - def ext_substatus_to_v1(sub_status_list): status_list = [] for substatus in sub_status_list: @@ -318,6 +328,9 @@ def ext_handler_status_to_v1(handler_status, ext_statuses, timestamp): "message": handler_status.message } + if handler_status.upgradeGuid is not None: + v1_handler_status["upgradeGuid"] = handler_status.upgradeGuid + if len(handler_status.extensions) > 0: # Currently, no more than one extension per handler ext_name = handler_status.extensions[0] @@ -334,6 +347,7 @@ def ext_handler_status_to_v1(handler_status, ext_statuses, timestamp): def vm_status_to_v1(vm_status, ext_statuses): timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + v1_ga_guest_info = ga_status_to_guest_info(vm_status.vmAgent) v1_ga_status = ga_status_to_v1(vm_status.vmAgent) v1_handler_status_list = [] for handler_status in vm_status.vmAgent.extensionHandlers: @@ -349,7 +363,8 @@ def vm_status_to_v1(vm_status, ext_statuses): v1_vm_status = { 'version': '1.1', 'timestampUTC': timestamp, - 'aggregateStatus': v1_agg_status + 'aggregateStatus': v1_agg_status, + 'guestOSInfo' : v1_ga_guest_info } return v1_vm_status @@ -512,51 +527,29 @@ class WireClient(object): self.shared_conf = None self.certs = None self.ext_conf = None - self.last_request = 0 - self.req_count = 0 self.host_plugin = None self.status_blob = StatusBlob(self) - def prevent_throttling(self): - """ - Try to avoid throttling of wire server - """ - now = time.time() - if now - self.last_request < 1: - logger.verbose("Last request issued less than 1 second ago") - logger.verbose("Sleep {0} second to avoid throttling.", - SHORT_WAITING_INTERVAL) - time.sleep(SHORT_WAITING_INTERVAL) - self.last_request = now - - self.req_count += 1 - if self.req_count % 3 == 0: - logger.verbose("Sleep {0} second to avoid throttling.", - SHORT_WAITING_INTERVAL) - time.sleep(SHORT_WAITING_INTERVAL) - self.req_count = 0 - def call_wireserver(self, http_req, *args, **kwargs): - self.prevent_throttling() - try: # Never use the HTTP proxy for wireserver kwargs['use_proxy'] = False resp = http_req(*args, **kwargs) + + if restutil.request_failed(resp): + msg = "[Wireserver Failed] URI {0} ".format(args[0]) + if resp is not None: + msg += " [HTTP Failed] Status Code {0}".format(resp.status) + raise ProtocolError(msg) + + # If the GoalState is stale, pass along the exception to the caller + except ResourceGoneError: + raise + except Exception as e: raise ProtocolError("[Wireserver Exception] {0}".format( ustr(e))) - if resp is not None and resp.status == httpclient.GONE: - msg = args[0] if len(args) > 0 else "" - raise WireProtocolResourceGone(msg) - - elif restutil.request_failed(resp): - msg = "[Wireserver Failed] URI {0} ".format(args[0]) - if resp is not None: - msg += " [HTTP Failed] Status Code {0}".format(resp.status) - raise ProtocolError(msg) - return resp def decode_config(self, data): @@ -598,7 +591,10 @@ class WireClient(object): def fetch_manifest(self, version_uris): logger.verbose("Fetch manifest") - for version in version_uris: + version_uris_shuffled = version_uris + random.shuffle(version_uris_shuffled) + + for version in version_uris_shuffled: response = None if not HostPluginProtocol.is_default_channel(): response = self.fetch(version.uri) @@ -698,26 +694,28 @@ class WireClient(object): self.ext_conf = ExtensionsConfig(xml_text) def update_goal_state(self, forced=False, max_retry=3): - uri = GOAL_STATE_URI.format(self.endpoint) - xml_text = self.fetch_config(uri, self.get_header()) - goal_state = GoalState(xml_text) - incarnation_file = os.path.join(conf.get_lib_dir(), INCARNATION_FILE_NAME) + uri = GOAL_STATE_URI.format(self.endpoint) - if not forced: - last_incarnation = None - if os.path.isfile(incarnation_file): - last_incarnation = fileutil.read_file(incarnation_file) - new_incarnation = goal_state.incarnation - if last_incarnation is not None and \ - last_incarnation == new_incarnation: - # Goalstate is not updated. - return - - # Start updating goalstate, retry on 410 + goal_state = None for retry in range(0, max_retry): try: + if goal_state is None: + xml_text = self.fetch_config(uri, self.get_header()) + goal_state = GoalState(xml_text) + + if not forced: + last_incarnation = None + if os.path.isfile(incarnation_file): + last_incarnation = fileutil.read_file( + incarnation_file) + new_incarnation = goal_state.incarnation + if last_incarnation is not None and \ + last_incarnation == new_incarnation: + # Goalstate is not updated. + return + self.goal_state = goal_state file_name = GOAL_STATE_FILE_NAME.format(goal_state.incarnation) goal_state_file = os.path.join(conf.get_lib_dir(), file_name) @@ -727,21 +725,29 @@ class WireClient(object): self.update_certs(goal_state) self.update_ext_conf(goal_state) self.save_cache(incarnation_file, goal_state.incarnation) + if self.host_plugin is not None: self.host_plugin.container_id = goal_state.container_id self.host_plugin.role_config_name = goal_state.role_config_name + return - except ProtocolError: + except ResourceGoneError: + logger.info("GoalState is stale -- re-fetching") + goal_state = None + + except Exception as e: + log_method = logger.info \ + if type(e) is ProtocolError \ + else logger.warn + log_method( + "Exception processing GoalState-related files: {0}".format( + ustr(e))) + if retry < max_retry-1: continue raise - except WireProtocolResourceGone: - logger.info("Incarnation is out of date. Update goalstate.") - xml_text = self.fetch_config(uri, self.get_header()) - goal_state = GoalState(xml_text) - raise ProtocolError("Exceeded max retry updating goal state") def get_goal_state(self): @@ -793,25 +799,70 @@ class WireClient(object): self.ext_conf = ExtensionsConfig(xml_text) return self.ext_conf - def get_ext_manifest(self, ext_handler, goal_state): + def get_ext_manifest(self, ext_handler, incarnation): + for update_goal_state in [False, True]: try: if update_goal_state: self.update_goal_state(forced=True) - goal_state = self.get_goal_state() + incarnation = self.get_goal_state().incarnation local_file = MANIFEST_FILE_NAME.format( ext_handler.name, - goal_state.incarnation) + incarnation) local_file = os.path.join(conf.get_lib_dir(), local_file) - xml_text = self.fetch_manifest(ext_handler.versionUris) - self.save_cache(local_file, xml_text) + + xml_text = None + if not update_goal_state: + try: + xml_text = self.fetch_cache(local_file) + except ProtocolError: + pass + + if xml_text is None: + xml_text = self.fetch_manifest(ext_handler.versionUris) + self.save_cache(local_file, xml_text) return ExtensionManifest(xml_text) except ResourceGoneError: continue - raise ProtocolError("Failed to retrieve extension manifest") + raise RestartError("Failed to retrieve extension manifest") + + def filter_package_list(self, family, ga_manifest, goal_state): + complete_list = ga_manifest.pkg_list + agent_manifest = os.path.join(conf.get_lib_dir(), + AGENTS_MANIFEST_FILE_NAME.format( + family, + goal_state.incarnation)) + + if not os.path.exists(agent_manifest): + # clear memory cache + ga_manifest.allowed_versions = None + + # create disk cache + with open(agent_manifest, mode='w') as manifest_fh: + for version in complete_list.versions: + manifest_fh.write('{0}\n'.format(version.version)) + fileutil.chmod(agent_manifest, 0o644) + + return complete_list + + else: + # use allowed versions from cache, otherwise from disk + if ga_manifest.allowed_versions is None: + with open(agent_manifest, mode='r') as manifest_fh: + ga_manifest.allowed_versions = [v.strip('\n') for v + in manifest_fh.readlines()] + + # use the updated manifest urls for allowed versions + allowed_list = ExtHandlerPackageList() + allowed_list.versions = [version for version + in complete_list.versions + if version.version + in ga_manifest.allowed_versions] + + return allowed_list def get_gafamily_manifest(self, vmagent_manifest, goal_state): for update_goal_state in [False, True]: @@ -844,7 +895,7 @@ class WireClient(object): logger.info("Wire protocol version:{0}", PROTOCOL_VERSION) elif PROTOCOL_VERSION in version_info.get_supported(): logger.info("Wire protocol version:{0}", PROTOCOL_VERSION) - logger.warn("Server preferred version:{0}", preferred) + logger.info("Server preferred version:{0}", preferred) else: error = ("Agent supported wire protocol version: {0} was not " "advised by Fabric.").format(PROTOCOL_VERSION) @@ -1360,6 +1411,9 @@ class ExtensionsConfig(object): ext_handler.properties.version = getattrib(plugin, "version") ext_handler.properties.state = getattrib(plugin, "state") + ext_handler.properties.upgradeGuid = getattrib(plugin, "upgradeGuid") + if not ext_handler.properties.upgradeGuid: + ext_handler.properties.upgradeGuid = None auto_upgrade = getattrib(plugin, "autoUpgrade") if auto_upgrade is not None and auto_upgrade.lower() == "true": ext_handler.properties.upgradePolicy = "auto" @@ -1418,6 +1472,7 @@ class ExtensionManifest(object): raise ValueError("ExtensionManifest is None") logger.verbose("Load ExtensionManifest.xml") self.pkg_list = ExtHandlerPackageList() + self.allowed_versions = None self.parse(xml_text) def parse(self, xml_text): |