diff options
Diffstat (limited to 'azurelinuxagent/ga')
-rw-r--r-- | azurelinuxagent/ga/__init__.py | 17 | ||||
-rw-r--r-- | azurelinuxagent/ga/env.py | 112 | ||||
-rw-r--r-- | azurelinuxagent/ga/exthandlers.py | 902 | ||||
-rw-r--r-- | azurelinuxagent/ga/monitor.py | 192 | ||||
-rw-r--r-- | azurelinuxagent/ga/update.py | 715 |
5 files changed, 1938 insertions, 0 deletions
diff --git a/azurelinuxagent/ga/__init__.py b/azurelinuxagent/ga/__init__.py new file mode 100644 index 0000000..1ea2f38 --- /dev/null +++ b/azurelinuxagent/ga/__init__.py @@ -0,0 +1,17 @@ +# Copyright 2014 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.4+ and Openssl 1.0+ +# + diff --git a/azurelinuxagent/ga/env.py b/azurelinuxagent/ga/env.py new file mode 100644 index 0000000..2d67d4b --- /dev/null +++ b/azurelinuxagent/ga/env.py @@ -0,0 +1,112 @@ +# Microsoft Azure Linux Agent +# +# Copyright 2014 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.4+ and Openssl 1.0+ +# + +import os +import socket +import time +import threading + +import azurelinuxagent.common.conf as conf +import azurelinuxagent.common.logger as logger + +from azurelinuxagent.common.dhcp import get_dhcp_handler +from azurelinuxagent.common.osutil import get_osutil + +def get_env_handler(): + return EnvHandler() + +class EnvHandler(object): + """ + Monitor changes to dhcp and hostname. + If dhcp clinet process re-start has occurred, reset routes, dhcp with fabric. + + Monitor scsi disk. + If new scsi disk found, set timeout + """ + def __init__(self): + self.osutil = get_osutil() + self.dhcp_handler = get_dhcp_handler() + self.stopped = True + self.hostname = None + self.dhcpid = None + self.server_thread=None + + def run(self): + if not self.stopped: + logger.info("Stop existing env monitor service.") + self.stop() + + self.stopped = False + logger.info("Start env monitor service.") + self.dhcp_handler.conf_routes() + self.hostname = socket.gethostname() + self.dhcpid = self.osutil.get_dhcp_pid() + self.server_thread = threading.Thread(target = self.monitor) + self.server_thread.setDaemon(True) + self.server_thread.start() + + def monitor(self): + """ + Monitor dhcp client pid and hostname. + If dhcp clinet process re-start has occurred, reset routes. + """ + while not self.stopped: + self.osutil.remove_rules_files() + timeout = conf.get_root_device_scsi_timeout() + if timeout is not None: + self.osutil.set_scsi_disks_timeout(timeout) + if conf.get_monitor_hostname(): + self.handle_hostname_update() + self.handle_dhclient_restart() + time.sleep(5) + + def handle_hostname_update(self): + curr_hostname = socket.gethostname() + if curr_hostname != self.hostname: + logger.info("EnvMonitor: Detected host name change: {0} -> {1}", + self.hostname, curr_hostname) + self.osutil.set_hostname(curr_hostname) + self.osutil.publish_hostname(curr_hostname) + self.hostname = curr_hostname + + def handle_dhclient_restart(self): + if self.dhcpid is None: + logger.warn("Dhcp client is not running. ") + self.dhcpid = self.osutil.get_dhcp_pid() + return + + #The dhcp process hasn't changed since last check + if self.osutil.check_pid_alive(self.dhcpid.strip()): + return + + newpid = self.osutil.get_dhcp_pid() + if newpid is not None and newpid != self.dhcpid: + logger.info("EnvMonitor: Detected dhcp client restart. " + "Restoring routing table.") + self.dhcp_handler.conf_routes() + self.dhcpid = newpid + + def stop(self): + """ + Stop server comminucation and join the thread to main thread. + """ + self.stopped = True + if self.server_thread is not None: + self.server_thread.join() + diff --git a/azurelinuxagent/ga/exthandlers.py b/azurelinuxagent/ga/exthandlers.py new file mode 100644 index 0000000..d3c8f32 --- /dev/null +++ b/azurelinuxagent/ga/exthandlers.py @@ -0,0 +1,902 @@ +# Microsoft Azure Linux Agent +# +# Copyright 2014 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.4+ and Openssl 1.0+ +# + +import glob +import json +import os +import shutil +import subprocess +import time +import zipfile + +import azurelinuxagent.common.conf as conf +import azurelinuxagent.common.logger as logger +import azurelinuxagent.common.utils.fileutil as fileutil +import azurelinuxagent.common.utils.restutil as restutil +import azurelinuxagent.common.utils.shellutil as shellutil + +from azurelinuxagent.common.event import add_event, WALAEventOperation +from azurelinuxagent.common.exception import ExtensionError, ProtocolError, HttpError +from azurelinuxagent.common.future import ustr +from azurelinuxagent.common.version import AGENT_VERSION +from azurelinuxagent.common.protocol.restapi import ExtHandlerStatus, \ + ExtensionStatus, \ + ExtensionSubStatus, \ + Extension, \ + VMStatus, ExtHandler, \ + get_properties, \ + set_properties +from azurelinuxagent.common.utils.flexible_version import FlexibleVersion +from azurelinuxagent.common.utils.textutil import Version +from azurelinuxagent.common.protocol import get_protocol_util +from azurelinuxagent.common.version import AGENT_NAME, CURRENT_AGENT, CURRENT_VERSION + +#HandlerEnvironment.json schema version +HANDLER_ENVIRONMENT_VERSION = 1.0 + +VALID_EXTENSION_STATUS = ['transitioning', 'error', 'success', 'warning'] + +VALID_HANDLER_STATUS = ['Ready', 'NotReady', "Installing", "Unresponsive"] + +def validate_has_key(obj, key, fullname): + if key not in obj: + raise ExtensionError("Missing: {0}".format(fullname)) + +def validate_in_range(val, valid_range, name): + if val not in valid_range: + raise ExtensionError("Invalid {0}: {1}".format(name, val)) + +def parse_formatted_message(formatted_message): + if formatted_message is None: + return None + validate_has_key(formatted_message, 'lang', 'formattedMessage/lang') + validate_has_key(formatted_message, 'message', 'formattedMessage/message') + return formatted_message.get('message') + +def parse_ext_substatus(substatus): + #Check extension sub status format + validate_has_key(substatus, 'status', 'substatus/status') + validate_in_range(substatus['status'], VALID_EXTENSION_STATUS, + 'substatus/status') + status = ExtensionSubStatus() + status.name = substatus.get('name') + status.status = substatus.get('status') + status.code = substatus.get('code', 0) + formatted_message = substatus.get('formattedMessage') + status.message = parse_formatted_message(formatted_message) + return status + +def parse_ext_status(ext_status, data): + if data is None or len(data) is None: + return + #Currently, only the first status will be reported + data = data[0] + #Check extension status format + validate_has_key(data, 'status', 'status') + status_data = data['status'] + validate_has_key(status_data, 'status', 'status/status') + + validate_in_range(status_data['status'], VALID_EXTENSION_STATUS, + 'status/status') + + applied_time = status_data.get('configurationAppliedTime') + ext_status.configurationAppliedTime = applied_time + ext_status.operation = status_data.get('operation') + ext_status.status = status_data.get('status') + ext_status.code = status_data.get('code', 0) + formatted_message = status_data.get('formattedMessage') + ext_status.message = parse_formatted_message(formatted_message) + substatus_list = status_data.get('substatus') + if substatus_list is None: + return + for substatus in substatus_list: + ext_status.substatusList.append(parse_ext_substatus(substatus)) + +class ExtHandlerState(object): + NotInstalled = "NotInstalled" + Installed = "Installed" + Enabled = "Enabled" + +def get_exthandlers_handler(): + return ExtHandlersHandler() + +class ExtHandlersHandler(object): + def __init__(self): + self.protocol_util = get_protocol_util() + self.ext_handlers = None + self.last_etag = None + self.log_report = False + + def run(self): + self.ext_handlers, etag = None, None + try: + self.protocol = self.protocol_util.get_protocol() + self.ext_handlers, etag = self.protocol.get_ext_handlers() + except ProtocolError as e: + msg = u"Exception retrieving extension handlers: {0}".format( + ustr(e)) + logger.warn(msg) + add_event(AGENT_NAME, version=CURRENT_VERSION, is_success=False, message=msg) + return + + if self.last_etag is not None and self.last_etag == etag: + msg = u"Incarnation {0} has no extension updates".format(etag) + logger.verbose(msg) + self.log_report = False + else: + msg = u"Handle extensions updates for incarnation {0}".format(etag) + logger.info(msg) + self.log_report = True #Log status report success on new config + self.handle_ext_handlers() + self.last_etag = etag + + self.report_ext_handlers_status() + + def run_status(self): + self.report_ext_handlers_status() + return + + def handle_ext_handlers(self): + if self.ext_handlers.extHandlers is None or \ + len(self.ext_handlers.extHandlers) == 0: + logger.info("No ext handler config found") + return + + for ext_handler in self.ext_handlers.extHandlers: + #TODO handle install in sequence, enable in parallel + self.handle_ext_handler(ext_handler) + + def handle_ext_handler(self, ext_handler): + ext_handler_i = ExtHandlerInstance(ext_handler, self.protocol) + try: + state = ext_handler.properties.state + ext_handler_i.logger.info("Expected handler state: {0}", state) + if state == "enabled": + self.handle_enable(ext_handler_i) + elif state == u"disabled": + self.handle_disable(ext_handler_i) + elif state == u"uninstall": + self.handle_uninstall(ext_handler_i) + else: + message = u"Unknown ext handler state:{0}".format(state) + raise ExtensionError(message) + except ExtensionError as e: + ext_handler_i.set_handler_status(message=ustr(e), code=-1) + ext_handler_i.report_event(message=ustr(e), is_success=False) + + def handle_enable(self, ext_handler_i): + + ext_handler_i.decide_version() + + old_ext_handler_i = ext_handler_i.get_installed_ext_handler() + if old_ext_handler_i is not None and \ + old_ext_handler_i.version_gt(ext_handler_i): + raise ExtensionError(u"Downgrade not allowed") + + handler_state = ext_handler_i.get_handler_state() + ext_handler_i.logger.info("Current handler state is: {0}", handler_state) + if handler_state == ExtHandlerState.NotInstalled: + ext_handler_i.set_handler_state(ExtHandlerState.NotInstalled) + + ext_handler_i.download() + + ext_handler_i.update_settings() + + if old_ext_handler_i is None: + ext_handler_i.install() + elif ext_handler_i.version_gt(old_ext_handler_i): + old_ext_handler_i.disable() + ext_handler_i.copy_status_files(old_ext_handler_i) + ext_handler_i.update() + old_ext_handler_i.uninstall() + old_ext_handler_i.rm_ext_handler_dir() + ext_handler_i.update_with_install() + else: + ext_handler_i.update_settings() + + ext_handler_i.enable() + + def handle_disable(self, ext_handler_i): + handler_state = ext_handler_i.get_handler_state() + ext_handler_i.logger.info("Current handler state is: {0}", handler_state) + if handler_state == ExtHandlerState.Enabled: + ext_handler_i.disable() + + def handle_uninstall(self, ext_handler_i): + handler_state = ext_handler_i.get_handler_state() + ext_handler_i.logger.info("Current handler state is: {0}", handler_state) + if handler_state != ExtHandlerState.NotInstalled: + if handler_state == ExtHandlerState.Enabled: + ext_handler_i.disable() + ext_handler_i.uninstall() + ext_handler_i.rm_ext_handler_dir() + + def report_ext_handlers_status(self): + """Go thru handler_state dir, collect and report status""" + vm_status = VMStatus() + vm_status.vmAgent.version = str(CURRENT_VERSION) + vm_status.vmAgent.status = "Ready" + vm_status.vmAgent.message = "Guest Agent is running" + + if self.ext_handlers is not None: + for ext_handler in self.ext_handlers.extHandlers: + try: + self.report_ext_handler_status(vm_status, ext_handler) + except ExtensionError as e: + add_event( + AGENT_NAME, + version=CURRENT_VERSION, + is_success=False, + message=ustr(e)) + + logger.verbose("Report vm agent status") + + try: + self.protocol.report_vm_status(vm_status) + except ProtocolError as e: + message = "Failed to report vm agent status: {0}".format(e) + add_event(AGENT_NAME, version=CURRENT_VERSION, is_success=False, message=message) + + if self.log_report: + logger.verbose("Successfully reported vm agent status") + + + def report_ext_handler_status(self, vm_status, ext_handler): + ext_handler_i = ExtHandlerInstance(ext_handler, self.protocol) + + handler_status = ext_handler_i.get_handler_status() + if handler_status is None: + return + + handler_state = ext_handler_i.get_handler_state() + if handler_state != ExtHandlerState.NotInstalled: + try: + active_exts = ext_handler_i.report_ext_status() + handler_status.extensions.extend(active_exts) + except ExtensionError as e: + ext_handler_i.set_handler_status(message=ustr(e), code=-1) + + try: + heartbeat = ext_handler_i.collect_heartbeat() + if heartbeat is not None: + handler_status.status = heartbeat.get('status') + except ExtensionError as e: + ext_handler_i.set_handler_status(message=ustr(e), code=-1) + + vm_status.vmAgent.extensionHandlers.append(handler_status) + +class ExtHandlerInstance(object): + def __init__(self, ext_handler, protocol): + self.ext_handler = ext_handler + self.protocol = protocol + self.operation = None + self.pkg = None + + prefix = "[{0}]".format(self.get_full_name()) + self.logger = logger.Logger(logger.DEFAULT_LOGGER, prefix) + + try: + fileutil.mkdir(self.get_log_dir(), mode=0o744) + except IOError as e: + self.logger.error(u"Failed to create extension log dir: {0}", e) + + log_file = os.path.join(self.get_log_dir(), "CommandExecution.log") + self.logger.add_appender(logger.AppenderType.FILE, + logger.LogLevel.INFO, log_file) + + def decide_version(self): + self.logger.info("Decide which version to use") + try: + pkg_list = self.protocol.get_ext_handler_pkgs(self.ext_handler) + except ProtocolError as e: + raise ExtensionError("Failed to get ext handler pkgs", e) + + # Determine the desired and installed versions + requested_version = FlexibleVersion(self.ext_handler.properties.version) + installed_version = FlexibleVersion(self.get_installed_version()) + if installed_version is None: + installed_version = requested_version + + # Divide packages + # - Find the installed package (its version must exactly match) + # - Find the internal candidate (its version must exactly match) + # - Separate the public packages + internal_pkg = None + installed_pkg = None + public_pkgs = [] + for pkg in pkg_list.versions: + pkg_version = FlexibleVersion(pkg.version) + if pkg_version == installed_version: + installed_pkg = pkg + if pkg.isinternal and pkg_version == requested_version: + internal_pkg = pkg + if not pkg.isinternal: + public_pkgs.append(pkg) + + internal_version = FlexibleVersion(internal_pkg.version) \ + if internal_pkg is not None \ + else FlexibleVersion() + public_pkgs.sort(key=lambda pkg: FlexibleVersion(pkg.version), reverse=True) + + # Determine the preferred version and type of upgrade occurring + preferred_version = max(requested_version, installed_version) + is_major_upgrade = preferred_version.major > installed_version.major + allow_minor_upgrade = self.ext_handler.properties.upgradePolicy == 'auto' + + # Find the first public candidate which + # - Matches the preferred major version + # - Does not upgrade to a new, disallowed major version + # - And only increments the minor version if allowed + # Notes: + # - The patch / hotfix version is not considered + public_pkg = None + for pkg in public_pkgs: + pkg_version = FlexibleVersion(pkg.version) + if pkg_version.major == preferred_version.major \ + and (not pkg.disallow_major_upgrade or not is_major_upgrade) \ + and (allow_minor_upgrade or pkg_version.minor == preferred_version.minor): + public_pkg = pkg + break + + # If there are no candidates, locate the highest public version whose + # major matches that installed + if internal_pkg is None and public_pkg is None: + for pkg in public_pkgs: + pkg_version = FlexibleVersion(pkg.version) + if pkg_version.major == installed_version.major: + public_pkg = pkg + break + + public_version = FlexibleVersion(public_pkg.version) \ + if public_pkg is not None \ + else FlexibleVersion() + + # Select the candidate + # - Use the public candidate if there is no internal candidate or + # the public is more recent (e.g., a hotfix patch) + # - Otherwise use the internal candidate + if internal_pkg is None or (public_pkg is not None and public_version > internal_version): + selected_pkg = public_pkg + else: + selected_pkg = internal_pkg + + selected_version = FlexibleVersion(selected_pkg.version) \ + if selected_pkg is not None \ + else FlexibleVersion() + + # Finally, update the version only if not downgrading + # Note: + # - A downgrade, which will be bound to the same major version, + # is allowed if the installed version is no longer available + if selected_pkg is None \ + or (installed_pkg is not None and selected_version < installed_version): + self.pkg = installed_pkg + self.ext_handler.properties.version = installed_version + else: + self.pkg = selected_pkg + self.ext_handler.properties.version = selected_pkg.version + + if self.pkg is None: + raise ExtensionError("Failed to find any valid extension package") + + self.logger.info("Use version: {0}", self.pkg.version) + return + + def version_gt(self, other): + self_version = self.ext_handler.properties.version + other_version = other.ext_handler.properties.version + return Version(self_version) > Version(other_version) + + def get_installed_ext_handler(self): + lastest_version = self.get_installed_version() + if lastest_version is None: + return None + + installed_handler = ExtHandler() + set_properties("ExtHandler", installed_handler, get_properties(self.ext_handler)) + installed_handler.properties.version = lastest_version + return ExtHandlerInstance(installed_handler, self.protocol) + + def get_installed_version(self): + lastest_version = None + + for path in glob.iglob(os.path.join(conf.get_lib_dir(), self.ext_handler.name + "-*")): + if not os.path.isdir(path): + continue + + separator = path.rfind('-') + version = FlexibleVersion(path[separator+1:]) + + if lastest_version is None or lastest_version < version: + lastest_version = version + + return str(lastest_version) if lastest_version is not None else None + + def copy_status_files(self, old_ext_handler_i): + self.logger.info("Copy status files from old plugin to new") + old_ext_dir = old_ext_handler_i.get_base_dir() + new_ext_dir = self.get_base_dir() + + old_ext_mrseq_file = os.path.join(old_ext_dir, "mrseq") + if os.path.isfile(old_ext_mrseq_file): + shutil.copy2(old_ext_mrseq_file, new_ext_dir) + + old_ext_status_dir = old_ext_handler_i.get_status_dir() + new_ext_status_dir = self.get_status_dir() + + if os.path.isdir(old_ext_status_dir): + for status_file in os.listdir(old_ext_status_dir): + status_file = os.path.join(old_ext_status_dir, status_file) + if os.path.isfile(status_file): + shutil.copy2(status_file, new_ext_status_dir) + + def set_operation(self, op): + self.operation = op + + def report_event(self, message="", is_success=True): + version = self.ext_handler.properties.version + add_event(name=self.ext_handler.name, version=version, message=message, + op=self.operation, is_success=is_success) + + def download(self): + self.logger.info("Download extension package") + self.set_operation(WALAEventOperation.Download) + if self.pkg is None: + raise ExtensionError("No package uri found") + + package = None + for uri in self.pkg.uris: + try: + package = self.protocol.download_ext_handler_pkg(uri.uri) + except ProtocolError as e: + logger.warn("Failed download extension: {0}", e) + + if package is None: + raise ExtensionError("Failed to download extension") + + self.logger.info("Unpack extension package") + pkg_file = os.path.join(conf.get_lib_dir(), + os.path.basename(uri.uri) + ".zip") + try: + fileutil.write_file(pkg_file, bytearray(package), asbin=True) + zipfile.ZipFile(pkg_file).extractall(self.get_base_dir()) + except IOError as e: + raise ExtensionError(u"Failed to write and unzip plugin", e) + + chmod = "find {0} -type f | xargs chmod u+x".format(self.get_base_dir()) + shellutil.run(chmod) + self.report_event(message="Download succeeded") + + self.logger.info("Initialize extension directory") + #Save HandlerManifest.json + man_file = fileutil.search_file(self.get_base_dir(), + 'HandlerManifest.json') + + if man_file is None: + raise ExtensionError("HandlerManifest.json not found") + + try: + man = fileutil.read_file(man_file, remove_bom=True) + fileutil.write_file(self.get_manifest_file(), man) + except IOError as e: + raise ExtensionError(u"Failed to save HandlerManifest.json", e) + + #Create status and config dir + try: + status_dir = self.get_status_dir() + fileutil.mkdir(status_dir, mode=0o700) + conf_dir = self.get_conf_dir() + fileutil.mkdir(conf_dir, mode=0o700) + except IOError as e: + raise ExtensionError(u"Failed to create status or config dir", e) + + #Save HandlerEnvironment.json + self.create_handler_env() + + def enable(self): + self.logger.info("Enable extension.") + self.set_operation(WALAEventOperation.Enable) + + man = self.load_manifest() + self.launch_command(man.get_enable_command(), timeout=300) + self.set_handler_state(ExtHandlerState.Enabled) + self.set_handler_status(status="Ready", message="Plugin enabled") + + def disable(self): + self.logger.info("Disable extension.") + self.set_operation(WALAEventOperation.Disable) + + man = self.load_manifest() + self.launch_command(man.get_disable_command(), timeout=900) + self.set_handler_state(ExtHandlerState.Installed) + self.set_handler_status(status="NotReady", message="Plugin disabled") + + def install(self): + self.logger.info("Install extension.") + self.set_operation(WALAEventOperation.Install) + + man = self.load_manifest() + self.launch_command(man.get_install_command(), timeout=900) + self.set_handler_state(ExtHandlerState.Installed) + + def uninstall(self): + self.logger.info("Uninstall extension.") + self.set_operation(WALAEventOperation.UnInstall) + + try: + man = self.load_manifest() + self.launch_command(man.get_uninstall_command()) + except ExtensionError as e: + self.report_event(message=ustr(e), is_success=False) + + def rm_ext_handler_dir(self): + try: + handler_state_dir = self.get_handler_state_dir() + if os.path.isdir(handler_state_dir): + self.logger.info("Remove ext handler dir: {0}", handler_state_dir) + shutil.rmtree(handler_state_dir) + base_dir = self.get_base_dir() + if os.path.isdir(base_dir): + self.logger.info("Remove ext handler dir: {0}", base_dir) + shutil.rmtree(base_dir) + except IOError as e: + message = "Failed to rm ext handler dir: {0}".format(e) + self.report_event(message=message, is_success=False) + + def update(self): + self.logger.info("Update extension.") + self.set_operation(WALAEventOperation.Update) + + man = self.load_manifest() + self.launch_command(man.get_update_command(), timeout=900) + + def update_with_install(self): + man = self.load_manifest() + if man.is_update_with_install(): + self.install() + else: + self.logger.info("UpdateWithInstall not set. " + "Skip install during upgrade.") + self.set_handler_state(ExtHandlerState.Installed) + + def get_largest_seq_no(self): + seq_no = -1 + conf_dir = self.get_conf_dir() + for item in os.listdir(conf_dir): + item_path = os.path.join(conf_dir, item) + if os.path.isfile(item_path): + try: + seperator = item.rfind(".") + if seperator > 0 and item[seperator + 1:] == 'settings': + curr_seq_no = int(item.split('.')[0]) + if curr_seq_no > seq_no: + seq_no = curr_seq_no + except Exception as e: + self.logger.verbose("Failed to parse file name: {0}", item) + continue + return seq_no + + def collect_ext_status(self, ext): + self.logger.verbose("Collect extension status") + + seq_no = self.get_largest_seq_no() + if seq_no == -1: + return None + + status_dir = self.get_status_dir() + ext_status_file = "{0}.status".format(seq_no) + ext_status_file = os.path.join(status_dir, ext_status_file) + + ext_status = ExtensionStatus(seq_no=seq_no) + try: + data_str = fileutil.read_file(ext_status_file) + data = json.loads(data_str) + parse_ext_status(ext_status, data) + except IOError as e: + ext_status.message = u"Failed to get status file {0}".format(e) + ext_status.code = -1 + ext_status.status = "error" + except ValueError as e: + ext_status.message = u"Malformed status file {0}".format(e) + ext_status.code = -1 + ext_status.status = "error" + + return ext_status + + def report_ext_status(self): + active_exts = [] + for ext in self.ext_handler.properties.extensions: + ext_status = self.collect_ext_status(ext) + if ext_status is None: + continue + try: + self.protocol.report_ext_status(self.ext_handler.name, ext.name, + ext_status) + active_exts.append(ext.name) + except ProtocolError as e: + self.logger.error(u"Failed to report extension status: {0}", e) + return active_exts + + def collect_heartbeat(self): + man = self.load_manifest() + if not man.is_report_heartbeat(): + return + heartbeat_file = os.path.join(conf.get_lib_dir(), + self.get_heartbeat_file()) + + self.logger.info("Collect heart beat") + if not os.path.isfile(heartbeat_file): + raise ExtensionError("Failed to get heart beat file") + if not self.is_responsive(heartbeat_file): + return { + "status": "Unresponsive", + "code": -1, + "message": "Extension heartbeat is not responsive" + } + try: + heartbeat_json = fileutil.read_file(heartbeat_file) + heartbeat = json.loads(heartbeat_json)[0]['heartbeat'] + except IOError as e: + raise ExtensionError("Failed to get heartbeat file:{0}".format(e)) + except ValueError as e: + raise ExtensionError("Malformed heartbeat file: {0}".format(e)) + return heartbeat + + def is_responsive(self, heartbeat_file): + last_update=int(time.time() - os.stat(heartbeat_file).st_mtime) + return last_update > 600 # not updated for more than 10 min + + def launch_command(self, cmd, timeout=300): + self.logger.info("Launch command:{0}", cmd) + base_dir = self.get_base_dir() + try: + devnull = open(os.devnull, 'w') + child = subprocess.Popen(base_dir + "/" + cmd, + shell=True, + cwd=base_dir, + stdout=devnull) + except Exception as e: + #TODO do not catch all exception + raise ExtensionError("Failed to launch: {0}, {1}".format(cmd, e)) + + retry = timeout + while retry > 0 and child.poll() is None: + time.sleep(1) + retry -= 1 + if retry == 0: + os.kill(child.pid, 9) + raise ExtensionError("Timeout({0}): {1}".format(timeout, cmd)) + + ret = child.wait() + if ret == None or ret != 0: + raise ExtensionError("Non-zero exit code: {0}, {1}".format(ret, cmd)) + + self.report_event(message="Launch command succeeded: {0}".format(cmd)) + + def load_manifest(self): + man_file = self.get_manifest_file() + try: + data = json.loads(fileutil.read_file(man_file)) + except IOError as e: + raise ExtensionError('Failed to load manifest file.') + except ValueError as e: + raise ExtensionError('Malformed manifest file.') + + return HandlerManifest(data[0]) + + def update_settings_file(self, settings_file, settings): + settings_file = os.path.join(self.get_conf_dir(), settings_file) + try: + fileutil.write_file(settings_file, settings) + except IOError as e: + raise ExtensionError(u"Failed to update settings file", e) + + def update_settings(self): + if self.ext_handler.properties.extensions is None or \ + len(self.ext_handler.properties.extensions) == 0: + #This is the behavior of waagent 2.0.x + #The new agent has to be consistent with the old one. + self.logger.info("Extension has no settings, write empty 0.settings") + self.update_settings_file("0.settings", "") + return + + for ext in self.ext_handler.properties.extensions: + settings = { + 'publicSettings': ext.publicSettings, + 'protectedSettings': ext.protectedSettings, + 'protectedSettingsCertThumbprint': ext.certificateThumbprint + } + ext_settings = { + "runtimeSettings":[{ + "handlerSettings": settings + }] + } + settings_file = "{0}.settings".format(ext.sequenceNumber) + self.logger.info("Update settings file: {0}", settings_file) + self.update_settings_file(settings_file, json.dumps(ext_settings)) + + def create_handler_env(self): + env = [{ + "name": self.ext_handler.name, + "version" : HANDLER_ENVIRONMENT_VERSION, + "handlerEnvironment" : { + "logFolder" : self.get_log_dir(), + "configFolder" : self.get_conf_dir(), + "statusFolder" : self.get_status_dir(), + "heartbeatFile" : self.get_heartbeat_file() + } + }] + try: + fileutil.write_file(self.get_env_file(), json.dumps(env)) + except IOError as e: + raise ExtensionError(u"Failed to save handler environment", e) + + def get_handler_state_dir(self): + return os.path.join(conf.get_lib_dir(), "handler_state", + self.get_full_name()) + + def set_handler_state(self, handler_state): + state_dir = self.get_handler_state_dir() + if not os.path.exists(state_dir): + try: + fileutil.mkdir(state_dir, 0o700) + except IOError as e: + self.logger.error("Failed to create state dir: {0}", e) + + try: + state_file = os.path.join(state_dir, "state") + fileutil.write_file(state_file, handler_state) + except IOError as e: + self.logger.error("Failed to set state: {0}", e) + + def get_handler_state(self): + state_dir = self.get_handler_state_dir() + state_file = os.path.join(state_dir, "state") + if not os.path.isfile(state_file): + return ExtHandlerState.NotInstalled + + try: + return fileutil.read_file(state_file) + except IOError as e: + self.logger.error("Failed to get state: {0}", e) + return ExtHandlerState.NotInstalled + + def set_handler_status(self, status="NotReady", message="", + code=0): + state_dir = self.get_handler_state_dir() + if not os.path.exists(state_dir): + try: + fileutil.mkdir(state_dir, 0o700) + except IOError as e: + self.logger.error("Failed to create state dir: {0}", e) + + handler_status = ExtHandlerStatus() + handler_status.name = self.ext_handler.name + handler_status.version = self.ext_handler.properties.version + handler_status.message = message + handler_status.code = code + handler_status.status = status + status_file = os.path.join(state_dir, "status") + + try: + fileutil.write_file(status_file, + json.dumps(get_properties(handler_status))) + except (IOError, ValueError, ProtocolError) as e: + self.logger.error("Failed to save handler status: {0}", e) + + def get_handler_status(self): + state_dir = self.get_handler_state_dir() + status_file = os.path.join(state_dir, "status") + if not os.path.isfile(status_file): + return None + + try: + data = json.loads(fileutil.read_file(status_file)) + handler_status = ExtHandlerStatus() + set_properties("ExtHandlerStatus", handler_status, data) + return handler_status + except (IOError, ValueError) as e: + self.logger.error("Failed to get handler status: {0}", e) + + def get_full_name(self): + return "{0}-{1}".format(self.ext_handler.name, + self.ext_handler.properties.version) + + def get_base_dir(self): + return os.path.join(conf.get_lib_dir(), self.get_full_name()) + + def get_status_dir(self): + return os.path.join(self.get_base_dir(), "status") + + def get_conf_dir(self): + return os.path.join(self.get_base_dir(), 'config') + + def get_heartbeat_file(self): + return os.path.join(self.get_base_dir(), 'heartbeat.log') + + def get_manifest_file(self): + return os.path.join(self.get_base_dir(), 'HandlerManifest.json') + + def get_env_file(self): + return os.path.join(self.get_base_dir(), 'HandlerEnvironment.json') + + def get_log_dir(self): + return os.path.join(conf.get_ext_log_dir(), self.ext_handler.name, + self.ext_handler.properties.version) + +class HandlerEnvironment(object): + def __init__(self, data): + self.data = data + + def get_version(self): + return self.data["version"] + + def get_log_dir(self): + return self.data["handlerEnvironment"]["logFolder"] + + def get_conf_dir(self): + return self.data["handlerEnvironment"]["configFolder"] + + def get_status_dir(self): + return self.data["handlerEnvironment"]["statusFolder"] + + def get_heartbeat_file(self): + return self.data["handlerEnvironment"]["heartbeatFile"] + +class HandlerManifest(object): + def __init__(self, data): + if data is None or data['handlerManifest'] is None: + raise ExtensionError('Malformed manifest file.') + self.data = data + + def get_name(self): + return self.data["name"] + + def get_version(self): + return self.data["version"] + + def get_install_command(self): + return self.data['handlerManifest']["installCommand"] + + def get_uninstall_command(self): + return self.data['handlerManifest']["uninstallCommand"] + + def get_update_command(self): + return self.data['handlerManifest']["updateCommand"] + + def get_enable_command(self): + return self.data['handlerManifest']["enableCommand"] + + def get_disable_command(self): + return self.data['handlerManifest']["disableCommand"] + + def is_reboot_after_install(self): + """ + Deprecated + """ + return False + + def is_report_heartbeat(self): + return self.data['handlerManifest'].get('reportHeartbeat', False) + + def is_update_with_install(self): + update_mode = self.data['handlerManifest'].get('updateMode') + if update_mode is None: + return True + return update_mode.low() == "updatewithinstall" diff --git a/azurelinuxagent/ga/monitor.py b/azurelinuxagent/ga/monitor.py new file mode 100644 index 0000000..f49cef8 --- /dev/null +++ b/azurelinuxagent/ga/monitor.py @@ -0,0 +1,192 @@ +# Copyright 2014 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.4+ and Openssl 1.0+ +# + +import datetime +import json +import os +import platform +import time +import threading + +import azurelinuxagent.common.conf as conf +import azurelinuxagent.common.logger as logger + +from azurelinuxagent.common.event import WALAEventOperation, add_event +from azurelinuxagent.common.exception import EventError, ProtocolError, OSUtilError +from azurelinuxagent.common.future import ustr +from azurelinuxagent.common.osutil import get_osutil +from azurelinuxagent.common.protocol import get_protocol_util +from azurelinuxagent.common.protocol.restapi import TelemetryEventParam, \ + TelemetryEventList, \ + TelemetryEvent, \ + set_properties +from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, getattrib +from azurelinuxagent.common.version import DISTRO_NAME, DISTRO_VERSION, \ + DISTRO_CODE_NAME, AGENT_LONG_VERSION, \ + CURRENT_AGENT, CURRENT_VERSION + + +def parse_event(data_str): + try: + return parse_json_event(data_str) + except ValueError: + return parse_xml_event(data_str) + + +def parse_xml_param(param_node): + name = getattrib(param_node, "Name") + value_str = getattrib(param_node, "Value") + attr_type = getattrib(param_node, "T") + value = value_str + if attr_type == 'mt:uint64': + value = int(value_str) + elif attr_type == 'mt:bool': + value = bool(value_str) + elif attr_type == 'mt:float64': + value = float(value_str) + return TelemetryEventParam(name, value) + + +def parse_xml_event(data_str): + try: + xml_doc = parse_doc(data_str) + event_id = getattrib(find(xml_doc, "Event"), 'id') + provider_id = getattrib(find(xml_doc, "Provider"), 'id') + event = TelemetryEvent(event_id, provider_id) + param_nodes = findall(xml_doc, 'Param') + for param_node in param_nodes: + event.parameters.append(parse_xml_param(param_node)) + return event + except Exception as e: + raise ValueError(ustr(e)) + + +def parse_json_event(data_str): + data = json.loads(data_str) + event = TelemetryEvent() + set_properties("TelemetryEvent", event, data) + return event + + +def get_monitor_handler(): + return MonitorHandler() + + +class MonitorHandler(object): + def __init__(self): + self.osutil = get_osutil() + self.protocol_util = get_protocol_util() + self.sysinfo = [] + + def run(self): + self.init_sysinfo() + + event_thread = threading.Thread(target=self.daemon) + event_thread.setDaemon(True) + event_thread.start() + + def init_sysinfo(self): + osversion = "{0}:{1}-{2}-{3}:{4}".format(platform.system(), + DISTRO_NAME, + DISTRO_VERSION, + DISTRO_CODE_NAME, + platform.release()) + self.sysinfo.append(TelemetryEventParam("OSVersion", osversion)) + self.sysinfo.append( + TelemetryEventParam("GAVersion", CURRENT_AGENT)) + + try: + ram = self.osutil.get_total_mem() + processors = self.osutil.get_processor_cores() + self.sysinfo.append(TelemetryEventParam("RAM", ram)) + self.sysinfo.append(TelemetryEventParam("Processors", processors)) + except OSUtilError as e: + logger.warn("Failed to get system info: {0}", e) + + try: + protocol = self.protocol_util.get_protocol() + vminfo = protocol.get_vminfo() + self.sysinfo.append(TelemetryEventParam("VMName", + vminfo.vmName)) + self.sysinfo.append(TelemetryEventParam("TenantName", + vminfo.tenantName)) + self.sysinfo.append(TelemetryEventParam("RoleName", + vminfo.roleName)) + self.sysinfo.append(TelemetryEventParam("RoleInstanceName", + vminfo.roleInstanceName)) + self.sysinfo.append(TelemetryEventParam("ContainerId", + vminfo.containerId)) + except ProtocolError as e: + logger.warn("Failed to get system info: {0}", e) + + def collect_event(self, evt_file_name): + try: + logger.verbose("Found event file: {0}", evt_file_name) + with open(evt_file_name, "rb") as evt_file: + # if fail to open or delete the file, throw exception + data_str = evt_file.read().decode("utf-8", 'ignore') + logger.verbose("Processed event file: {0}", evt_file_name) + os.remove(evt_file_name) + return data_str + except IOError as e: + msg = "Failed to process {0}, {1}".format(evt_file_name, e) + raise EventError(msg) + + def collect_and_send_events(self): + event_list = TelemetryEventList() + event_dir = os.path.join(conf.get_lib_dir(), "events") + event_files = os.listdir(event_dir) + for event_file in event_files: + if not event_file.endswith(".tld"): + continue + event_file_path = os.path.join(event_dir, event_file) + try: + data_str = self.collect_event(event_file_path) + except EventError as e: + logger.error("{0}", e) + continue + + try: + event = parse_event(data_str) + event.parameters.extend(self.sysinfo) + event_list.events.append(event) + except (ValueError, ProtocolError) as e: + logger.warn("Failed to decode event file: {0}", e) + continue + + if len(event_list.events) == 0: + return + + try: + protocol = self.protocol_util.get_protocol() + protocol.report_event(event_list) + except ProtocolError as e: + logger.error("{0}", e) + + def daemon(self): + last_heartbeat = datetime.datetime.min + period = datetime.timedelta(minutes=30) + while True: + if (datetime.datetime.now() - last_heartbeat) > period: + last_heartbeat = datetime.datetime.now() + add_event(op=WALAEventOperation.HeartBeat, name="WALA", + is_success=True) + try: + self.collect_and_send_events() + except Exception as e: + logger.warn("Failed to send events: {0}", e) + time.sleep(60) diff --git a/azurelinuxagent/ga/update.py b/azurelinuxagent/ga/update.py new file mode 100644 index 0000000..e89608a --- /dev/null +++ b/azurelinuxagent/ga/update.py @@ -0,0 +1,715 @@ +# Windows Azure Linux Agent +# +# Copyright 2014 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.4+ and Openssl 1.0+ +# +import glob +import json +import os +import platform +import re +import shlex +import shutil +import signal +import subprocess +import sys +import time +import zipfile + +import azurelinuxagent.common.conf as conf +import azurelinuxagent.common.logger as logger +import azurelinuxagent.common.utils.fileutil as fileutil +import azurelinuxagent.common.utils.restutil as restutil +import azurelinuxagent.common.utils.textutil as textutil + +from azurelinuxagent.common.event import add_event, WALAEventOperation +from azurelinuxagent.common.exception import UpdateError, ProtocolError +from azurelinuxagent.common.future import ustr +from azurelinuxagent.common.osutil import get_osutil +from azurelinuxagent.common.protocol import get_protocol_util +from azurelinuxagent.common.utils.flexible_version import FlexibleVersion +from azurelinuxagent.common.version import AGENT_NAME, AGENT_VERSION, AGENT_LONG_VERSION, \ + AGENT_DIR_GLOB, AGENT_PKG_GLOB, \ + AGENT_PATTERN, AGENT_NAME_PATTERN, AGENT_DIR_PATTERN, \ + CURRENT_AGENT, CURRENT_VERSION, \ + is_current_agent_installed + +from azurelinuxagent.ga.exthandlers import HandlerManifest + + +AGENT_ERROR_FILE = "error.json" # File name for agent error record +AGENT_MANIFEST_FILE = "HandlerManifest.json" + +CHILD_HEALTH_INTERVAL = 15 * 60 +CHILD_LAUNCH_INTERVAL = 5 * 60 +CHILD_LAUNCH_RESTART_MAX = 3 +CHILD_POLL_INTERVAL = 60 + +MAX_FAILURE = 3 # Max failure allowed for agent before blacklisted + +GOAL_STATE_INTERVAL = 25 +REPORT_STATUS_INTERVAL = 15 +RETAIN_INTERVAL = 24 * 60 * 60 # Retain interval for black list + + +def get_update_handler(): + return UpdateHandler() + + +def get_python_cmd(): + major_version = platform.python_version_tuple()[0] + return "python" if int(major_version) <= 2 else "python{0}".format(major_version) + + +class UpdateHandler(object): + + def __init__(self): + self.osutil = get_osutil() + self.protocol_util = get_protocol_util() + + self.running = True + self.last_etag = None + self.last_attempt_time = None + + self.agents = [] + + self.child_agent = None + self.child_launch_time = None + self.child_launch_attempts = 0 + self.child_process = None + + self.signal_handler = None + return + + def run_latest(self): + """ + This method is called from the daemon to find and launch the most + current, downloaded agent. + + Note: + - Most events should be tagged to the launched agent (agent_version) + """ + + if self.child_process is not None: + raise Exception("Illegal attempt to launch multiple goal state Agent processes") + + if self.signal_handler is None: + self.signal_handler = signal.signal(signal.SIGTERM, self.forward_signal) + + latest_agent = self.get_latest_agent() + if latest_agent is None: + logger.info(u"Installed Agent {0} is the most current agent", CURRENT_AGENT) + agent_cmd = "python -u {0} -run-exthandlers".format(sys.argv[0]) + agent_dir = os.getcwd() + agent_name = CURRENT_AGENT + agent_version = CURRENT_VERSION + else: + logger.info(u"Determined Agent {0} to be the latest agent", latest_agent.name) + agent_cmd = latest_agent.get_agent_cmd() + agent_dir = latest_agent.get_agent_dir() + agent_name = latest_agent.name + agent_version = latest_agent.version + + try: + + # Launch the correct Python version for python-based agents + cmds = shlex.split(agent_cmd) + if cmds[0].lower() == "python": + cmds[0] = get_python_cmd() + agent_cmd = " ".join(cmds) + + self._evaluate_agent_health(latest_agent) + + self.child_process = subprocess.Popen( + cmds, + cwd=agent_dir, + stdout=sys.stdout, + stderr=sys.stderr) + + logger.info(u"Agent {0} launched with command '{1}'", agent_name, agent_cmd) + + ret = None + start_time = time.time() + while (time.time() - start_time) < CHILD_HEALTH_INTERVAL: + time.sleep(CHILD_POLL_INTERVAL) + ret = self.child_process.poll() + if ret is not None: + break + + if ret is None or ret <= 0: + msg = u"Agent {0} launched with command '{1}' is successfully running".format( + agent_name, + agent_cmd) + logger.info(msg) + add_event( + AGENT_NAME, + version=agent_version, + op=WALAEventOperation.Enable, + is_success=True, + message=msg) + + if ret is None: + ret = self.child_process.wait() + + else: + msg = u"Agent {0} launched with command '{1}' failed with return code: {2}".format( + agent_name, + agent_cmd, + ret) + logger.warn(msg) + add_event( + AGENT_NAME, + version=agent_version, + op=WALAEventOperation.Enable, + is_success=False, + message=msg) + + if ret is not None and ret > 0: + msg = u"Agent {0} launched with command '{1}' returned code: {2}".format( + agent_name, + agent_cmd, + ret) + logger.warn(msg) + if latest_agent is not None: + latest_agent.mark_failure() + + except Exception as e: + msg = u"Agent {0} launched with command '{1}' failed with exception: {2}".format( + agent_name, + agent_cmd, + ustr(e)) + logger.warn(msg) + add_event( + AGENT_NAME, + version=agent_version, + op=WALAEventOperation.Enable, + is_success=False, + message=msg) + if latest_agent is not None: + latest_agent.mark_failure(is_fatal=True) + + self.child_process = None + return + + def run(self): + """ + This is the main loop which watches for agent and extension updates. + """ + + logger.info(u"Agent {0} is running as the goal state agent", CURRENT_AGENT) + + # Launch monitoring threads + from azurelinuxagent.ga.monitor import get_monitor_handler + get_monitor_handler().run() + + from azurelinuxagent.ga.env import get_env_handler + get_env_handler().run() + + from azurelinuxagent.ga.exthandlers import get_exthandlers_handler + exthandlers_handler = get_exthandlers_handler() + + # TODO: Add means to stop running + try: + while self.running: + if self._ensure_latest_agent(): + if len(self.agents) > 0: + logger.info( + u"Agent {0} discovered {1} as an update and will exit", + CURRENT_AGENT, + self.agents[0].name) + break + + exthandlers_handler.run() + + time.sleep(25) + + except Exception as e: + logger.warn(u"Agent {0} failed with exception: {1}", CURRENT_AGENT, ustr(e)) + sys.exit(1) + + sys.exit(0) + return + + def forward_signal(self, signum, frame): + if self.child_process is None: + return + + logger.info( + u"Agent {0} forwarding signal {1} to {2}", + CURRENT_AGENT, + signum, + self.child_agent.name if self.child_agent is not None else CURRENT_AGENT) + self.child_process.send_signal(signum) + + if self.signal_handler not in (None, signal.SIG_IGN, signal.SIG_DFL): + self.signal_handler(signum, frame) + elif self.signal_handler is signal.SIG_DFL: + if signum == signal.SIGTERM: + sys.exit(0) + return + + def get_latest_agent(self): + """ + If autoupdate is enabled, return the most current, downloaded, + non-blacklisted agent (if any). + Otherwise, return None (implying to use the installed agent). + """ + + if not conf.get_autoupdate_enabled(): + return None + + self._load_agents() + available_agents = [agent for agent in self.agents if agent.is_available] + return available_agents[0] if len(available_agents) >= 1 else None + + def _ensure_latest_agent(self, base_version=CURRENT_VERSION): + # Ignore new agents if updating is disabled + if not conf.get_autoupdate_enabled(): + return False + + now = time.time() + if self.last_attempt_time is not None: + next_attempt_time = self.last_attempt_time + conf.get_autoupdate_frequency() + else: + next_attempt_time = now + if next_attempt_time > now: + return False + + family = conf.get_autoupdate_gafamily() + logger.info("Checking for agent family {0} updates", family) + + self.last_attempt_time = now + try: + protocol = self.protocol_util.get_protocol() + manifest_list, etag = protocol.get_vmagent_manifests() + except Exception as e: + msg = u"Exception retrieving agent manifests: {0}".format(ustr(e)) + logger.warn(msg) + add_event( + AGENT_NAME, + op=WALAEventOperation.Download, + version=CURRENT_VERSION, + is_success=False, + message=msg) + return False + + if self.last_etag is not None and self.last_etag == etag: + logger.info(u"Incarnation {0} has no agent updates", etag) + return False + + manifests = [m for m in manifest_list.vmAgentManifests if m.family == family] + if len(manifests) == 0: + logger.info(u"Incarnation {0} has no agent family {1} updates", etag, family) + return False + + try: + pkg_list = protocol.get_vmagent_pkgs(manifests[0]) + except ProtocolError as e: + msg= u"Incarnation {0} failed to get {1} package list: {2}".format( + etag, + family, + ustr(e)) + logger.warn(msg) + add_event( + AGENT_NAME, + op=WALAEventOperation.Download, + version=CURRENT_VERSION, + is_success=False, + message=msg) + return False + + # Set the agents to those available for download at least as current as the existing agent + # and remove from disk any agent no longer reported to the VM. + # Note: + # The code leaves on disk available, but blacklisted, agents so as to preserve the state. + # Otherwise, those agents could be again downloaded and inappropriately retried. + self._set_agents([GuestAgent(pkg=pkg) for pkg in pkg_list.versions]) + self._purge_agents() + self._filter_blacklisted_agents() + + # Return True if agents more recent than the current are available + return len(self.agents) > 0 and self.agents[0].version > base_version + + def _evaluate_agent_health(self, latest_agent): + """ + Evaluate the health of the selected agent: If it is restarting + too frequently, raise an Exception to force blacklisting. + """ + if latest_agent is None: + self.child_agent = None + return + + if self.child_agent is None or latest_agent.version != self.child_agent.version: + self.child_agent = latest_agent + self.child_launch_time = None + self.child_launch_attempts = 0 + + if self.child_launch_time is None: + self.child_launch_time = time.time() + + self.child_launch_attempts += 1 + + if (time.time() - self.child_launch_time) <= CHILD_LAUNCH_INTERVAL \ + and self.child_launch_attempts >= CHILD_LAUNCH_RESTART_MAX: + msg = u"Agent {0} restarted more than {1} times in {2} seconds".format( + self.child_agent.name, + CHILD_LAUNCH_RESTART_MAX, + CHILD_LAUNCH_INTERVAL) + raise Exception(msg) + return + + def _filter_blacklisted_agents(self): + self.agents = [agent for agent in self.agents if not agent.is_blacklisted] + return + + def _load_agents(self): + """ + Load all non-blacklisted agents currently on disk. + """ + if len(self.agents) <= 0: + try: + path = os.path.join(conf.get_lib_dir(), "{0}-*".format(AGENT_NAME)) + self._set_agents([GuestAgent(path=agent_dir) + for agent_dir in glob.iglob(path) if os.path.isdir(agent_dir)]) + self._filter_blacklisted_agents() + except Exception as e: + logger.warn(u"Exception occurred loading available agents: {0}", ustr(e)) + return + + def _purge_agents(self): + """ + Remove from disk all directories and .zip files of unknown agents + (without removing the current, running agent). + """ + path = os.path.join(conf.get_lib_dir(), "{0}-*".format(AGENT_NAME)) + + known_versions = [agent.version for agent in self.agents] + if not is_current_agent_installed() and CURRENT_VERSION not in known_versions: + logger.warn( + u"Running Agent {0} was not found in the agent manifest - adding to list", + CURRENT_VERSION) + known_versions.append(CURRENT_VERSION) + + for agent_path in glob.iglob(path): + try: + name = fileutil.trim_ext(agent_path, "zip") + m = AGENT_DIR_PATTERN.match(name) + if m is not None and FlexibleVersion(m.group(1)) not in known_versions: + if os.path.isfile(agent_path): + logger.info(u"Purging outdated Agent file {0}", agent_path) + os.remove(agent_path) + else: + logger.info(u"Purging outdated Agent directory {0}", agent_path) + shutil.rmtree(agent_path) + except Exception as e: + logger.warn(u"Purging {0} raised exception: {1}", agent_path, ustr(e)) + return + + def _set_agents(self, agents=[]): + self.agents = agents + self.agents.sort(key=lambda agent: agent.version, reverse=True) + return + + +class GuestAgent(object): + def __init__(self, path=None, pkg=None): + self.pkg = pkg + version = None + if path is not None: + m = AGENT_DIR_PATTERN.match(path) + if m == None: + raise UpdateError(u"Illegal agent directory: {0}".format(path)) + version = m.group(1) + elif self.pkg is not None: + version = pkg.version + + if version == None: + raise UpdateError(u"Illegal agent version: {0}".format(version)) + self.version = FlexibleVersion(version) + + location = u"disk" if path is not None else u"package" + logger.info(u"Instantiating Agent {0} from {1}", self.name, location) + + self.error = None + self._load_error() + self._ensure_downloaded() + return + + @property + def name(self): + return "{0}-{1}".format(AGENT_NAME, self.version) + + def get_agent_cmd(self): + return self.manifest.get_enable_command() + + def get_agent_dir(self): + return os.path.join(conf.get_lib_dir(), self.name) + + def get_agent_error_file(self): + return os.path.join(conf.get_lib_dir(), self.name, AGENT_ERROR_FILE) + + def get_agent_manifest_path(self): + return os.path.join(self.get_agent_dir(), AGENT_MANIFEST_FILE) + + def get_agent_pkg_path(self): + return ".".join((os.path.join(conf.get_lib_dir(), self.name), "zip")) + + def clear_error(self): + self.error.clear() + return + + @property + def is_available(self): + return self.is_downloaded and not self.is_blacklisted + + @property + def is_blacklisted(self): + return self.error is not None and self.error.is_blacklisted + + @property + def is_downloaded(self): + return self.is_blacklisted or os.path.isfile(self.get_agent_manifest_path()) + + def mark_failure(self, is_fatal=False): + try: + if not os.path.isdir(self.get_agent_dir()): + os.makedirs(self.get_agent_dir()) + self.error.mark_failure(is_fatal=is_fatal) + self.error.save() + if is_fatal: + logger.warn(u"Agent {0} is permanently blacklisted", self.name) + except Exception as e: + logger.warn(u"Agent {0} failed recording error state: {1}", self.name, ustr(e)) + return + + def _ensure_downloaded(self): + try: + logger.info(u"Ensuring Agent {0} is downloaded", self.name) + + if self.is_blacklisted: + logger.info(u"Agent {0} is blacklisted - skipping download", self.name) + return + + if self.is_downloaded: + logger.info(u"Agent {0} was previously downloaded - skipping download", self.name) + self._load_manifest() + return + + if self.pkg is None: + raise UpdateError(u"Agent {0} is missing package and download URIs".format( + self.name)) + + self._download() + self._unpack() + self._load_manifest() + self._load_error() + + msg = u"Agent {0} downloaded successfully".format(self.name) + logger.info(msg) + add_event( + AGENT_NAME, + version=self.version, + op=WALAEventOperation.Install, + is_success=True, + message=msg) + + except Exception as e: + # Note the failure, blacklist the agent if the package downloaded + # - An exception with a downloaded package indicates the package + # is corrupt (e.g., missing the HandlerManifest.json file) + self.mark_failure(is_fatal=os.path.isfile(self.get_agent_pkg_path())) + + msg = u"Agent {0} download failed with exception: {1}".format(self.name, ustr(e)) + logger.warn(msg) + add_event( + AGENT_NAME, + version=self.version, + op=WALAEventOperation.Install, + is_success=False, + message=msg) + return + + def _download(self): + package = None + + for uri in self.pkg.uris: + try: + resp = restutil.http_get(uri.uri, chk_proxy=True) + if resp.status == restutil.httpclient.OK: + package = resp.read() + fileutil.write_file(self.get_agent_pkg_path(), bytearray(package), asbin=True) + logger.info(u"Agent {0} downloaded from {1}", self.name, uri.uri) + break + except restutil.HttpError as e: + logger.warn(u"Agent {0} download from {1} failed", self.name, uri.uri) + + if not os.path.isfile(self.get_agent_pkg_path()): + msg = u"Unable to download Agent {0} from any URI".format(self.name) + add_event( + AGENT_NAME, + op=WALAEventOperation.Download, + version=CURRENT_VERSION, + is_success=False, + message=msg) + raise UpdateError(msg) + return + + def _load_error(self): + try: + if self.error is None: + self.error = GuestAgentError(self.get_agent_error_file()) + self.error.load() + logger.info(u"Agent {0} error state: {1}", self.name, ustr(self.error)) + except Exception as e: + logger.warn(u"Agent {0} failed loading error state: {1}", self.name, ustr(e)) + return + + def _load_manifest(self): + path = self.get_agent_manifest_path() + if not os.path.isfile(path): + msg = u"Agent {0} is missing the {1} file".format(self.name, AGENT_MANIFEST_FILE) + raise UpdateError(msg) + + with open(path, "r") as manifest_file: + try: + manifests = json.load(manifest_file) + except Exception as e: + msg = u"Agent {0} has a malformed {1}".format(self.name, AGENT_MANIFEST_FILE) + raise UpdateError(msg) + if type(manifests) is list: + if len(manifests) <= 0: + msg = u"Agent {0} has an empty {1}".format(self.name, AGENT_MANIFEST_FILE) + raise UpdateError(msg) + manifest = manifests[0] + else: + manifest = manifests + + try: + self.manifest = HandlerManifest(manifest) + if len(self.manifest.get_enable_command()) <= 0: + raise Exception(u"Manifest is missing the enable command") + except Exception as e: + msg = u"Agent {0} has an illegal {1}: {2}".format( + self.name, + AGENT_MANIFEST_FILE, + ustr(e)) + raise UpdateError(msg) + + logger.info( + u"Agent {0} loaded manifest from {1}", + self.name, + self.get_agent_manifest_path()) + logger.verbose(u"Successfully loaded Agent {0} {1}: {2}", + self.name, + AGENT_MANIFEST_FILE, + ustr(self.manifest.data)) + return + + def _unpack(self): + try: + if os.path.isdir(self.get_agent_dir()): + shutil.rmtree(self.get_agent_dir()) + + zipfile.ZipFile(self.get_agent_pkg_path()).extractall(self.get_agent_dir()) + + except Exception as e: + msg = u"Exception unpacking Agent {0} from {1}: {2}".format( + self.name, + self.get_agent_pkg_path(), + ustr(e)) + raise UpdateError(msg) + + if not os.path.isdir(self.get_agent_dir()): + msg = u"Unpacking Agent {0} failed to create directory {1}".format( + self.name, + self.get_agent_dir()) + raise UpdateError(msg) + + logger.info( + u"Agent {0} unpacked successfully to {1}", + self.name, + self.get_agent_dir()) + return + + +class GuestAgentError(object): + def __init__(self, path): + if path is None: + raise UpdateError(u"GuestAgentError requires a path") + self.path = path + + self.clear() + self.load() + return + + def mark_failure(self, is_fatal=False): + self.last_failure = time.time() + self.failure_count += 1 + self.was_fatal = is_fatal + return + + def clear(self): + self.last_failure = 0.0 + self.failure_count = 0 + self.was_fatal = False + return + + def clear_old_failure(self): + if self.last_failure <= 0.0: + return + if self.last_failure < (time.time() - RETAIN_INTERVAL): + self.clear() + return + + @property + def is_blacklisted(self): + return self.was_fatal or self.failure_count >= MAX_FAILURE + + def load(self): + if self.path is not None and os.path.isfile(self.path): + with open(self.path, 'r') as f: + self.from_json(json.load(f)) + return + + def save(self): + if os.path.isdir(os.path.dirname(self.path)): + with open(self.path, 'w') as f: + json.dump(self.to_json(), f) + return + + def from_json(self, data): + self.last_failure = max( + self.last_failure, + data.get(u"last_failure", 0.0)) + self.failure_count = max( + self.failure_count, + data.get(u"failure_count", 0)) + self.was_fatal = self.was_fatal or data.get(u"was_fatal", False) + return + + def to_json(self): + data = { + u"last_failure": self.last_failure, + u"failure_count": self.failure_count, + u"was_fatal" : self.was_fatal + } + return data + + def __str__(self): + return "Last Failure: {0}, Total Failures: {1}, Fatal: {2}".format( + self.last_failure, + self.failure_count, + self.was_fatal) |