diff options
Diffstat (limited to 'azurelinuxagent/ga')
-rw-r--r-- | azurelinuxagent/ga/env.py | 147 | ||||
-rw-r--r-- | azurelinuxagent/ga/exthandlers.py | 236 | ||||
-rw-r--r-- | azurelinuxagent/ga/monitor.py | 43 | ||||
-rw-r--r-- | azurelinuxagent/ga/update.py | 280 |
4 files changed, 462 insertions, 244 deletions
diff --git a/azurelinuxagent/ga/env.py b/azurelinuxagent/ga/env.py index 45b10bb..d9b7d82 100644 --- a/azurelinuxagent/ga/env.py +++ b/azurelinuxagent/ga/env.py @@ -17,11 +17,16 @@ # Requires Python 2.4+ and Openssl 1.0+ # +import re import os import socket import time import threading +import operator + +import datetime + import azurelinuxagent.common.conf as conf import azurelinuxagent.common.logger as logger @@ -29,15 +34,29 @@ from azurelinuxagent.common.dhcp import get_dhcp_handler from azurelinuxagent.common.event import add_periodic, WALAEventOperation from azurelinuxagent.common.osutil import get_osutil from azurelinuxagent.common.protocol import get_protocol_util +from azurelinuxagent.common.protocol.wire import INCARNATION_FILE_NAME +from azurelinuxagent.common.utils import fileutil from azurelinuxagent.common.version import AGENT_NAME, CURRENT_VERSION +CACHE_PATTERNS = [ + re.compile("^(.*)\.(\d+)\.(agentsManifest)$", re.IGNORECASE), + re.compile("^(.*)\.(\d+)\.(manifest\.xml)$", re.IGNORECASE), + re.compile("^(.*)\.(\d+)\.(xml)$", re.IGNORECASE) +] + +MAXIMUM_CACHED_FILES = 50 + +CACHE_PURGE_INTERVAL = datetime.timedelta(hours=24) + + def get_env_handler(): return EnvHandler() + class EnvHandler(object): """ Monitor changes to dhcp and hostname. - If dhcp clinet process re-start has occurred, reset routes, dhcp with fabric. + If dhcp client process re-start has occurred, reset routes, dhcp with fabric. Monitor scsi disk. If new scsi disk found, set timeout @@ -48,9 +67,10 @@ class EnvHandler(object): self.protocol_util = get_protocol_util() self.stopped = True self.hostname = None - self.dhcpid = None + self.dhcp_id = None self.server_thread = None self.dhcp_warning_enabled = True + self.last_purge = None def run(self): if not self.stopped: @@ -61,7 +81,7 @@ class EnvHandler(object): logger.info("Start env monitor service.") self.dhcp_handler.conf_routes() self.hostname = self.osutil.get_hostname_record() - self.dhcpid = self.osutil.get_dhcp_pid() + self.dhcp_id = self.osutil.get_dhcp_pid() self.server_thread = threading.Thread(target=self.monitor) self.server_thread.setDaemon(True) self.server_thread.start() @@ -70,26 +90,24 @@ class EnvHandler(object): """ Monitor firewall rules Monitor dhcp client pid and hostname. - If dhcp clinet process re-start has occurred, reset routes. + If dhcp client process re-start has occurred, reset routes. + Purge unnecessary files from disk cache. """ protocol = self.protocol_util.get_protocol() while not self.stopped: self.osutil.remove_rules_files() - # Disable setting firewall for now, regardless of configuration switch - # if conf.enable_firewall(): - # success = self.osutil.enable_firewall( - # dst_ip=protocol.endpoint, - # uid=os.getuid()) - # add_periodic( - # logger.EVERY_HOUR, - # AGENT_NAME, - # version=CURRENT_VERSION, - # op=WALAEventOperation.Firewall, - # is_success=success, - # log_event=True) - - self.osutil.remove_firewall() + if conf.enable_firewall(): + success = self.osutil.enable_firewall( + dst_ip=protocol.endpoint, + uid=os.getuid()) + add_periodic( + logger.EVERY_HOUR, + AGENT_NAME, + version=CURRENT_VERSION, + op=WALAEventOperation.Firewall, + is_success=success, + log_event=False) timeout = conf.get_root_device_scsi_timeout() if timeout is not None: @@ -100,6 +118,8 @@ class EnvHandler(object): self.handle_dhclient_restart() + self.purge_disk_cache() + time.sleep(5) def handle_hostname_update(self): @@ -113,30 +133,95 @@ class EnvHandler(object): self.hostname = curr_hostname def handle_dhclient_restart(self): - if self.dhcpid is None: + if self.dhcp_id is None: if self.dhcp_warning_enabled: logger.warn("Dhcp client is not running. ") - self.dhcpid = self.osutil.get_dhcp_pid() + self.dhcp_id = self.osutil.get_dhcp_pid() # disable subsequent error logging - self.dhcp_warning_enabled = self.dhcpid is not None + self.dhcp_warning_enabled = self.dhcp_id is not None + return + + # the dhcp process has not changed since the last check + if self.osutil.check_pid_alive(self.dhcp_id.strip()): return - #The dhcp process hasn't changed since last check - if self.osutil.check_pid_alive(self.dhcpid.strip()): + new_pid = self.osutil.get_dhcp_pid() + if new_pid is not None and new_pid != self.dhcp_id: + logger.info("EnvMonitor: Detected dhcp client restart. " + "Restoring routing table.") + self.dhcp_handler.conf_routes() + self.dhcp_id = new_pid + + def purge_disk_cache(self): + """ + Ensure the number of cached files does not exceed a maximum count. + Purge only once per interval, and never delete files related to the + current incarnation. + """ + if self.last_purge is not None \ + and datetime.datetime.utcnow() < \ + self.last_purge + CACHE_PURGE_INTERVAL: + return + + current_incarnation = -1 + self.last_purge = datetime.datetime.utcnow() + incarnation_file = os.path.join(conf.get_lib_dir(), + INCARNATION_FILE_NAME) + if os.path.exists(incarnation_file): + last_incarnation = fileutil.read_file(incarnation_file) + if last_incarnation is not None: + current_incarnation = int(last_incarnation) + + logger.info("Purging disk cache, current incarnation is {0}" + .format('not found' + if current_incarnation == -1 + else current_incarnation)) + + # Create tuples: (prefix, suffix, incarnation, name, file_modified) + files = [] + for f in os.listdir(conf.get_lib_dir()): + full_path = os.path.join(conf.get_lib_dir(), f) + for pattern in CACHE_PATTERNS: + m = pattern.match(f) + if m is not None: + prefix = m.group(1) + suffix = m.group(3) + incarnation = int(m.group(2)) + file_modified = os.path.getmtime(full_path) + t = (prefix, suffix, incarnation, f, file_modified) + files.append(t) + break + + if len(files) <= 0: return - newpid = self.osutil.get_dhcp_pid() - if newpid is not None and newpid != self.dhcpid: - logger.info("EnvMonitor: Detected dhcp client restart. " - "Restoring routing table.") - self.dhcp_handler.conf_routes() - self.dhcpid = newpid + # Sort by (prefix, suffix, file_modified) in reverse order + files = sorted(files, key=operator.itemgetter(0, 1, 4), reverse=True) + + # Remove any files in excess of the maximum allowed + # -- Restart then whenever the (prefix, suffix) change + count = 0 + last_match = [None, None] + for f in files: + if last_match != f[0:2]: + last_match = f[0:2] + count = 0 + + if current_incarnation == f[2]: + logger.verbose("Skipping {0}".format(f[3])) + continue + + count += 1 + + if count > MAXIMUM_CACHED_FILES: + full_name = os.path.join(conf.get_lib_dir(), f[3]) + logger.verbose("Deleting {0}".format(full_name)) + os.remove(full_name) def stop(self): """ - Stop server comminucation and join the thread to main thread. + Stop server communication and join the thread to main thread. """ self.stopped = True if self.server_thread is not None: self.server_thread.join() - diff --git a/azurelinuxagent/ga/exthandlers.py b/azurelinuxagent/ga/exthandlers.py index f0a3b09..cc1796b 100644 --- a/azurelinuxagent/ga/exthandlers.py +++ b/azurelinuxagent/ga/exthandlers.py @@ -17,10 +17,12 @@ # Requires Python 2.4+ and Openssl 1.0+ # +import datetime import glob import json import os import os.path +import random import re import shutil import stat @@ -31,25 +33,22 @@ import zipfile import azurelinuxagent.common.conf as conf import azurelinuxagent.common.logger as logger import azurelinuxagent.common.utils.fileutil as fileutil -import azurelinuxagent.common.utils.restutil as restutil -import azurelinuxagent.common.utils.shellutil as shellutil import azurelinuxagent.common.version as version +from azurelinuxagent.common.errorstate import ErrorState, ERROR_STATE_DELTA -from azurelinuxagent.common.event import add_event, WALAEventOperation -from azurelinuxagent.common.exception import ExtensionError, ProtocolError, HttpError +from azurelinuxagent.common.event import add_event, WALAEventOperation, elapsed_milliseconds +from azurelinuxagent.common.exception import ExtensionError, ProtocolError, RestartError from azurelinuxagent.common.future import ustr -from azurelinuxagent.common.version import AGENT_VERSION from azurelinuxagent.common.protocol.restapi import ExtHandlerStatus, \ ExtensionStatus, \ ExtensionSubStatus, \ - Extension, \ VMStatus, ExtHandler, \ get_properties, \ set_properties from azurelinuxagent.common.utils.flexible_version import FlexibleVersion -from azurelinuxagent.common.utils.textutil import Version from azurelinuxagent.common.protocol import get_protocol_util -from azurelinuxagent.common.version import AGENT_NAME, CURRENT_AGENT, CURRENT_VERSION +from azurelinuxagent.common.version import AGENT_NAME, CURRENT_VERSION + #HandlerEnvironment.json schema version HANDLER_ENVIRONMENT_VERSION = 1.0 @@ -64,7 +63,6 @@ HANDLER_PKG_EXT = ".zip" HANDLER_PKG_PATTERN = re.compile(HANDLER_PATTERN+"\\"+HANDLER_PKG_EXT+"$", re.IGNORECASE) - def validate_has_key(obj, key, fullname): if key not in obj: raise ExtensionError("Missing: {0}".format(fullname)) @@ -176,35 +174,73 @@ class ExtHandlersHandler(object): self.protocol = None self.ext_handlers = None self.last_etag = None + self.last_upgrade_guids = {} self.log_report = False self.log_etag = True + self.log_process = False + + self.report_status_error_state = ErrorState() def run(self): self.ext_handlers, etag = None, None try: self.protocol = self.protocol_util.get_protocol() self.ext_handlers, etag = self.protocol.get_ext_handlers() - except ProtocolError as e: + except Exception as e: msg = u"Exception retrieving extension handlers: {0}".format( ustr(e)) logger.warn(msg) - add_event(AGENT_NAME, version=CURRENT_VERSION, is_success=False, message=msg) + add_event(AGENT_NAME, + version=CURRENT_VERSION, + op=WALAEventOperation.ExtensionProcessing, + is_success=False, + message=msg) return - msg = u"Handle extensions updates for incarnation {0}".format(etag) - logger.verbose(msg) - # Log status report success on new config - self.log_report = True - self.handle_ext_handlers(etag) - self.last_etag = etag - - self.report_ext_handlers_status() - self.cleanup_outdated_handlers() + try: + msg = u"Handle extensions updates for incarnation {0}".format(etag) + logger.verbose(msg) + # Log status report success on new config + self.log_report = True + self.handle_ext_handlers(etag) + self.last_etag = etag + + self.report_ext_handlers_status() + self.cleanup_outdated_handlers() + except RestartError: + raise + except Exception as e: + msg = u"Exception processing extension handlers: {0}".format( + ustr(e)) + logger.warn(msg) + add_event(AGENT_NAME, + version=CURRENT_VERSION, + op=WALAEventOperation.ExtensionProcessing, + is_success=False, + message=msg) + return def run_status(self): self.report_ext_handlers_status() return + def get_upgrade_guid(self, name): + return self.last_upgrade_guids.get(name, (None, False))[0] + + def get_log_upgrade_guid(self, ext_handler): + return self.last_upgrade_guids.get(ext_handler.name, (None, False))[1] + + def set_log_upgrade_guid(self, ext_handler, log_val): + guid = self.get_upgrade_guid(ext_handler.name) + if guid is not None: + self.last_upgrade_guids[ext_handler.name] = (guid, log_val) + + def is_new_guid(self, ext_handler): + last_guid = self.get_upgrade_guid(ext_handler.name) + if last_guid is None: + return True + return last_guid != ext_handler.properties.upgradeGuid + def cleanup_outdated_handlers(self): handlers = [] pkgs = [] @@ -288,7 +324,29 @@ class ExtHandlersHandler(object): ext_handler_i = ExtHandlerInstance(ext_handler, self.protocol) try: - ext_handler_i.decide_version() + state = ext_handler.properties.state + # The extension is to be enabled, there is an upgrade GUID + # and the GUID is NOT new + if state == u"enabled" and \ + ext_handler.properties.upgradeGuid is not None and \ + not self.is_new_guid(ext_handler): + ext_handler_i.ext_handler.properties.version = ext_handler_i.get_installed_version() + ext_handler_i.set_logger() + if self.last_etag != etag: + self.set_log_upgrade_guid(ext_handler, True) + + msg = "New GUID is the same as the old GUID. Exiting without upgrading." + if self.get_log_upgrade_guid(ext_handler): + ext_handler_i.logger.info(msg) + self.set_log_upgrade_guid(ext_handler, False) + ext_handler_i.set_handler_state(ExtHandlerState.Enabled) + ext_handler_i.set_handler_status(status="Ready", message="No change") + ext_handler_i.set_operation(WALAEventOperation.SkipUpdate) + ext_handler_i.report_event(message=ustr(msg), is_success=True) + return + + self.set_log_upgrade_guid(ext_handler, True) + ext_handler_i.decide_version(etag=etag, target_state=state) if not ext_handler_i.is_upgrade and self.last_etag == etag: if self.log_etag: ext_handler_i.logger.verbose("Version {0} is current for etag {1}", @@ -299,22 +357,34 @@ class ExtHandlersHandler(object): self.log_etag = True - state = ext_handler.properties.state ext_handler_i.logger.info("Target handler state: {0}", state) if state == u"enabled": self.handle_enable(ext_handler_i) + if ext_handler.properties.upgradeGuid is not None: + ext_handler_i.logger.info("New Upgrade GUID: {0}", ext_handler.properties.upgradeGuid) + self.last_upgrade_guids[ext_handler.name] = (ext_handler.properties.upgradeGuid, True) elif state == u"disabled": self.handle_disable(ext_handler_i) + # Remove the GUID from the dictionary so that it is upgraded upon re-enable + self.last_upgrade_guids.pop(ext_handler.name, None) elif state == u"uninstall": self.handle_uninstall(ext_handler_i) + # Remove the GUID from the dictionary so that it is upgraded upon re-install + self.last_upgrade_guids.pop(ext_handler.name, None) else: message = u"Unknown ext handler state:{0}".format(state) raise ExtensionError(message) - except ExtensionError as e: + except RestartError: + ext_handler_i.logger.info("GoalState became stale during " + "processing. Restarting with new " + "GoalState") + raise + except Exception as e: ext_handler_i.set_handler_status(message=ustr(e), code=-1) ext_handler_i.report_event(message=ustr(e), is_success=False) def handle_enable(self, ext_handler_i): + self.log_process = True old_ext_handler_i = ext_handler_i.get_installed_ext_handler() if old_ext_handler_i is not None and \ old_ext_handler_i.version_gt(ext_handler_i): @@ -341,6 +411,7 @@ class ExtHandlersHandler(object): ext_handler_i.enable() def handle_disable(self, ext_handler_i): + self.log_process = True handler_state = ext_handler_i.get_handler_state() ext_handler_i.logger.info("[Disable] current handler state is: {0}", handler_state.lower()) @@ -348,6 +419,7 @@ class ExtHandlersHandler(object): ext_handler_i.disable() def handle_uninstall(self, ext_handler_i): + self.log_process = True handler_state = ext_handler_i.get_handler_state() ext_handler_i.logger.info("[Uninstall] current handler state is: {0}", handler_state.lower()) @@ -356,7 +428,7 @@ class ExtHandlersHandler(object): ext_handler_i.disable() ext_handler_i.uninstall() ext_handler_i.rm_ext_handler_dir() - + def report_ext_handlers_status(self): """Go through handler_state dir, collect and report status""" vm_status = VMStatus(status="Ready", message="Guest Agent is running") @@ -368,6 +440,7 @@ class ExtHandlersHandler(object): add_event( AGENT_NAME, version=CURRENT_VERSION, + op=WALAEventOperation.ExtensionProcessing, is_success=False, message=ustr(e)) @@ -376,9 +449,27 @@ class ExtHandlersHandler(object): self.protocol.report_vm_status(vm_status) if self.log_report: logger.verbose("Completed vm agent status report") + self.report_status_error_state.reset() except ProtocolError as e: + self.report_status_error_state.incr() message = "Failed to report vm agent status: {0}".format(e) - add_event(AGENT_NAME, version=CURRENT_VERSION, is_success=False, message=message) + add_event(AGENT_NAME, + version=CURRENT_VERSION, + op=WALAEventOperation.ExtensionProcessing, + is_success=False, + message=message) + + if self.report_status_error_state.is_triggered(): + message = "Failed to report vm agent status for more than {0}"\ + .format(ERROR_STATE_DELTA) + + add_event(AGENT_NAME, + version=CURRENT_VERSION, + op=WALAEventOperation.ExtensionProcessing, + is_success=False, + message=message) + + self.report_status_error_state.reset() def report_ext_handler_status(self, vm_status, ext_handler): ext_handler_i = ExtHandlerInstance(ext_handler, self.protocol) @@ -386,6 +477,9 @@ class ExtHandlersHandler(object): handler_status = ext_handler_i.get_handler_status() if handler_status is None: return + guid = self.get_upgrade_guid(ext_handler.name) + if guid is not None: + handler_status.upgradeGuid = guid handler_state = ext_handler_i.get_handler_state() if handler_state != ExtHandlerState.NotInstalled: @@ -413,9 +507,7 @@ class ExtHandlerInstance(object): self.pkg = None self.pkg_file = None self.is_upgrade = False - - prefix = "[{0}]".format(self.get_full_name()) - self.logger = logger.Logger(logger.DEFAULT_LOGGER, prefix) + self.set_logger() try: fileutil.mkdir(self.get_log_dir(), mode=0o755) @@ -426,15 +518,13 @@ class ExtHandlerInstance(object): self.logger.add_appender(logger.AppenderType.FILE, logger.LogLevel.INFO, log_file) - def decide_version(self): + def decide_version(self, etag, target_state=None): self.logger.verbose("Decide which version to use") - try: - pkg_list = self.protocol.get_ext_handler_pkgs(self.ext_handler) - except ProtocolError as e: - raise ExtensionError("Failed to get ext handler pkgs", e) + pkg_list = self.protocol.get_ext_handler_pkgs(self.ext_handler, etag) # Determine the desired and installed versions - requested_version = FlexibleVersion(self.ext_handler.properties.version) + requested_version = FlexibleVersion( + str(self.ext_handler.properties.version)) installed_version_string = self.get_installed_version() installed_version = requested_version \ if installed_version_string is None \ @@ -511,13 +601,22 @@ class ExtHandlerInstance(object): # Note: # - A downgrade, which will be bound to the same major version, # is allowed if the installed version is no longer available - if selected_pkg is None \ + if target_state == u"uninstall": + if installed_pkg is None: + msg = "Failed to find installed version of {0} " \ + "to uninstall".format(self.ext_handler.name) + self.logger.warn(msg) + self.pkg = installed_pkg + self.ext_handler.properties.version = str(installed_version) \ + if installed_version is not None else None + elif selected_pkg is None \ or (installed_pkg is not None and selected_version < installed_version): self.pkg = installed_pkg - self.ext_handler.properties.version = installed_version + self.ext_handler.properties.version = str(installed_version) \ + if installed_version is not None else None else: self.pkg = selected_pkg - self.ext_handler.properties.version = selected_pkg.version + self.ext_handler.properties.version = str(selected_pkg.version) # Note if the selected package is greater than that installed if installed_pkg is None \ @@ -528,12 +627,17 @@ class ExtHandlerInstance(object): raise ExtensionError("Failed to find any valid extension package") self.logger.verbose("Use version: {0}", self.pkg.version) + self.set_logger() return + def set_logger(self): + prefix = "[{0}]".format(self.get_full_name()) + self.logger = logger.Logger(logger.DEFAULT_LOGGER, prefix) + def version_gt(self, other): self_version = self.ext_handler.properties.version other_version = other.ext_handler.properties.version - return Version(self_version) > Version(other_version) + return FlexibleVersion(self_version) > FlexibleVersion(other_version) def get_installed_ext_handler(self): lastest_version = self.get_installed_version() @@ -585,23 +689,26 @@ class ExtHandlerInstance(object): status_file = os.path.join(old_ext_status_dir, status_file) if os.path.isfile(status_file): shutil.copy2(status_file, new_ext_status_dir) - + def set_operation(self, op): self.operation = op - def report_event(self, message="", is_success=True): + def report_event(self, message="", is_success=True, duration=0): version = self.ext_handler.properties.version add_event(name=self.ext_handler.name, version=version, message=message, - op=self.operation, is_success=is_success) + op=self.operation, is_success=is_success, duration=duration) def download(self): + begin_utc = datetime.datetime.utcnow() self.logger.verbose("Download extension package") self.set_operation(WALAEventOperation.Download) if self.pkg is None: raise ExtensionError("No package uri found") package = None - for uri in self.pkg.uris: + uris_shuffled = self.pkg.uris + random.shuffle(uris_shuffled) + for uri in uris_shuffled: try: package = self.protocol.download_ext_handler_pkg(uri.uri) if package is not None: @@ -627,7 +734,8 @@ class ExtHandlerInstance(object): for file in fileutil.get_all_files(self.get_base_dir()): fileutil.chmod(file, os.stat(file).st_mode | stat.S_IXUSR) - self.report_event(message="Download succeeded") + duration = elapsed_milliseconds(begin_utc) + self.report_event(message="Download succeeded", duration=duration) self.logger.info("Initialize extension directory") #Save HandlerManifest.json @@ -649,8 +757,25 @@ class ExtHandlerInstance(object): try: status_dir = self.get_status_dir() fileutil.mkdir(status_dir, mode=0o700) + + seq_no, status_path = self.get_status_file_path() + if seq_no > -1: + now = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") + status = { + "version": 1.0, + "timestampUTC" : now, + "status" : { + "name" : self.ext_handler.name, + "operation" : "Enabling Handler", + "status" : "transitioning", + "code" : 0 + } + } + fileutil.write_file(json.dumps(status), status_path) + conf_dir = self.get_conf_dir() fileutil.mkdir(conf_dir, mode=0o700) + except IOError as e: fileutil.clean_ioerror(e, paths=[self.get_base_dir(), self.pkg_file]) @@ -740,17 +865,24 @@ class ExtHandlerInstance(object): continue return seq_no + def get_status_file_path(self): + seq_no = self.get_largest_seq_no() + path = None + + if seq_no > -1: + path = os.path.join( + self.get_status_dir(), + "{0}.status".format(seq_no)) + + return seq_no, path + def collect_ext_status(self, ext): self.logger.verbose("Collect extension status") - seq_no = self.get_largest_seq_no() + seq_no, ext_status_file = self.get_status_file_path() if seq_no == -1: return None - status_dir = self.get_status_dir() - ext_status_file = "{0}.status".format(seq_no) - ext_status_file = os.path.join(status_dir, ext_status_file) - ext_status = ExtensionStatus(seq_no=seq_no) try: data_str = fileutil.read_file(ext_status_file) @@ -810,6 +942,7 @@ class ExtHandlerInstance(object): return last_update <= 600 # updated within the last 10 min def launch_command(self, cmd, timeout=300): + begin_utc = datetime.datetime.utcnow() self.logger.verbose("Launch command: [{0}]", cmd) base_dir = self.get_base_dir() try: @@ -835,7 +968,8 @@ class ExtHandlerInstance(object): if ret == None or ret != 0: raise ExtensionError("Non-zero exit code: {0}, {1}".format(ret, cmd)) - self.report_event(message="Launch command succeeded: {0}".format(cmd)) + duration = elapsed_milliseconds(begin_utc) + self.report_event(message="Launch command succeeded: {0}".format(cmd), duration=duration) def load_manifest(self): man_file = self.get_manifest_file() @@ -929,7 +1063,7 @@ class ExtHandlerInstance(object): handler_status = ExtHandlerStatus() handler_status.name = self.ext_handler.name - handler_status.version = self.ext_handler.properties.version + handler_status.version = str(self.ext_handler.properties.version) handler_status.message = message handler_status.code = code handler_status.status = status @@ -980,7 +1114,7 @@ class ExtHandlerInstance(object): def get_log_dir(self): return os.path.join(conf.get_ext_log_dir(), self.ext_handler.name, - self.ext_handler.properties.version) + str(self.ext_handler.properties.version)) class HandlerEnvironment(object): def __init__(self, data): diff --git a/azurelinuxagent/ga/monitor.py b/azurelinuxagent/ga/monitor.py index 307a514..71ac9b0 100644 --- a/azurelinuxagent/ga/monitor.py +++ b/azurelinuxagent/ga/monitor.py @@ -21,8 +21,10 @@ import os import platform import time import threading +import uuid import azurelinuxagent.common.conf as conf +import azurelinuxagent.common.utils.fileutil as fileutil import azurelinuxagent.common.logger as logger from azurelinuxagent.common.event import add_event, WALAEventOperation @@ -34,6 +36,7 @@ from azurelinuxagent.common.protocol.restapi import TelemetryEventParam, \ TelemetryEventList, \ TelemetryEvent, \ set_properties +from azurelinuxagent.common.utils.restutil import IOErrorCounter from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, getattrib from azurelinuxagent.common.version import DISTRO_NAME, DISTRO_VERSION, \ DISTRO_CODE_NAME, AGENT_LONG_VERSION, \ @@ -179,15 +182,53 @@ class MonitorHandler(object): def daemon(self): period = datetime.timedelta(minutes=30) + protocol = self.protocol_util.get_protocol() last_heartbeat = datetime.datetime.utcnow() - period + + # Create a new identifier on each restart and reset the counter + heartbeat_id = str(uuid.uuid4()).upper() + counter = 0 while True: if datetime.datetime.utcnow() >= (last_heartbeat + period): last_heartbeat = datetime.datetime.utcnow() + incarnation = protocol.get_incarnation() + dropped_packets = self.osutil.get_firewall_dropped_packets( + protocol.endpoint) + + msg = "{0};{1};{2};{3}".format( + incarnation, counter, heartbeat_id, dropped_packets) + add_event( name=AGENT_NAME, version=CURRENT_VERSION, op=WALAEventOperation.HeartBeat, - is_success=True) + is_success=True, + message=msg, + log_event=False) + + counter += 1 + + io_errors = IOErrorCounter.get_and_reset() + hostplugin_errors = io_errors.get("hostplugin") + protocol_errors = io_errors.get("protocol") + other_errors = io_errors.get("other") + + if hostplugin_errors > 0 \ + or protocol_errors > 0 \ + or other_errors > 0: + + msg = "hostplugin:{0};protocol:{1};other:{2}"\ + .format(hostplugin_errors, + protocol_errors, + other_errors) + add_event( + name=AGENT_NAME, + version=CURRENT_VERSION, + op=WALAEventOperation.HttpErrors, + is_success=True, + message=msg, + log_event=False) + try: self.collect_and_send_events() except Exception as e: diff --git a/azurelinuxagent/ga/update.py b/azurelinuxagent/ga/update.py index b7ee96a..2e43031 100644 --- a/azurelinuxagent/ga/update.py +++ b/azurelinuxagent/ga/update.py @@ -21,9 +21,11 @@ import glob import json import os import platform +import random import re import shutil import signal +import stat import subprocess import sys import time @@ -43,6 +45,7 @@ from azurelinuxagent.common.event import add_event, add_periodic, \ WALAEventOperation from azurelinuxagent.common.exception import ProtocolError, \ ResourceGoneError, \ + RestartError, \ UpdateError from azurelinuxagent.common.future import ustr from azurelinuxagent.common.osutil import get_osutil @@ -60,7 +63,7 @@ from azurelinuxagent.ga.exthandlers import HandlerManifest AGENT_ERROR_FILE = "error.json" # File name for agent error record AGENT_MANIFEST_FILE = "HandlerManifest.json" -AGENT_SUPPORTED_FILE = "supported.json" +AGENT_PARTITION_FILE = "partition" CHILD_HEALTH_INTERVAL = 15 * 60 CHILD_LAUNCH_INTERVAL = 5 * 60 @@ -71,10 +74,18 @@ MAX_FAILURE = 3 # Max failure allowed for agent before blacklisted GOAL_STATE_INTERVAL = 3 -ORPHAN_WAIT_INTERVAL = 15 * 60 * 60 +ORPHAN_WAIT_INTERVAL = 15 * 60 AGENT_SENTINAL_FILE = "current_version" +READONLY_FILE_GLOBS = [ + "*.crt", + "*.p7m", + "*.pem", + "*.prv", + "ovf-env.xml" +] + def get_update_handler(): return UpdateHandler() @@ -101,7 +112,6 @@ class UpdateHandler(object): self.child_process = None self.signal_handler = None - return def run_latest(self, child_args=None): """ @@ -233,36 +243,43 @@ class UpdateHandler(object): This is the main loop which watches for agent and extension updates. """ - logger.info(u"Agent {0} is running as the goal state agent", - CURRENT_AGENT) + try: + logger.info(u"Agent {0} is running as the goal state agent", + CURRENT_AGENT) - # Launch monitoring threads - from azurelinuxagent.ga.monitor import get_monitor_handler - get_monitor_handler().run() + # Launch monitoring threads + from azurelinuxagent.ga.monitor import get_monitor_handler + get_monitor_handler().run() - from azurelinuxagent.ga.env import get_env_handler - get_env_handler().run() + from azurelinuxagent.ga.env import get_env_handler + get_env_handler().run() - from azurelinuxagent.ga.exthandlers import get_exthandlers_handler, migrate_handler_state - exthandlers_handler = get_exthandlers_handler() - migrate_handler_state() + from azurelinuxagent.ga.exthandlers import get_exthandlers_handler, migrate_handler_state + exthandlers_handler = get_exthandlers_handler() + migrate_handler_state() - try: self._ensure_no_orphans() self._emit_restart_event() + self._ensure_partition_assigned() + self._ensure_readonly_files() while self.running: if self._is_orphaned: - logger.info("Goal state agent {0} was orphaned -- exiting", + logger.info("Agent {0} is an orphan -- exiting", CURRENT_AGENT) break if self._upgrade_available(): - if len(self.agents) > 0: + available_agent = self.get_latest_agent() + if available_agent is None: + logger.info( + "Agent {0} is reverting to the installed agent -- exiting", + CURRENT_AGENT) + else: logger.info( - u"Agent {0} discovered {1} as an update and will exit", + u"Agent {0} discovered update {1} -- exiting", CURRENT_AGENT, - self.agents[0].name) + available_agent.name) break utc_start = datetime.utcnow() @@ -271,20 +288,24 @@ class UpdateHandler(object): exthandlers_handler.run() if last_etag != exthandlers_handler.last_etag: + self._ensure_readonly_files() add_event( AGENT_NAME, version=CURRENT_VERSION, op=WALAEventOperation.ProcessGoalState, is_success=True, duration=elapsed_milliseconds(utc_start), + message="Incarnation {0}".format( + exthandlers_handler.last_etag), log_event=True) time.sleep(GOAL_STATE_INTERVAL) except Exception as e: - logger.warn(u"Agent {0} failed with exception: {1}", - CURRENT_AGENT, - ustr(e)) + msg = u"Agent {0} failed with exception: {1}".format( + CURRENT_AGENT, ustr(e)) + self._set_sentinal(msg=msg) + logger.warn(msg) logger.warn(traceback.format_exc()) sys.exit(1) # additional return here because sys.exit is mocked in unit tests @@ -338,17 +359,21 @@ class UpdateHandler(object): return available_agents[0] if len(available_agents) >= 1 else None def _emit_restart_event(self): - if not self._is_clean_start: - msg = u"{0} did not terminate cleanly".format(CURRENT_AGENT) - logger.info(msg) - add_event( - AGENT_NAME, - version=CURRENT_VERSION, - op=WALAEventOperation.Restart, - is_success=False, - message=msg) + try: + if not self._is_clean_start: + msg = u"Agent did not terminate cleanly: {0}".format( + fileutil.read_file(self._sentinal_file_path())) + logger.info(msg) + add_event( + AGENT_NAME, + version=CURRENT_VERSION, + op=WALAEventOperation.Restart, + is_success=False, + message=msg) + except Exception: + pass - self._set_sentinal() + self._set_sentinal(msg="Starting") return def _ensure_no_orphans(self, orphan_wait_interval=ORPHAN_WAIT_INTERVAL): @@ -382,6 +407,26 @@ class UpdateHandler(object): ustr(e)) return + def _ensure_partition_assigned(self): + """ + Assign the VM to a partition (0 - 99). Downloaded updates may be configured + to run on only some VMs; the assigned partition determines eligibility. + """ + if not os.path.exists(self._partition_file): + partition = ustr(int(datetime.utcnow().microsecond / 10000)) + fileutil.write_file(self._partition_file, partition) + add_event( + AGENT_NAME, + version=CURRENT_VERSION, + op=WALAEventOperation.Partition, + is_success=True, + message=partition) + + def _ensure_readonly_files(self): + for g in READONLY_FILE_GLOBS: + for path in glob.iglob(os.path.join(conf.get_lib_dir(), g)): + os.chmod(path, stat.S_IRUSR) + def _evaluate_agent_health(self, latest_agent): """ Evaluate the health of the selected agent: If it is restarting @@ -412,7 +457,6 @@ class UpdateHandler(object): def _filter_blacklisted_agents(self): self.agents = [agent for agent in self.agents if not agent.is_blacklisted] - return def _find_agents(self): """ @@ -447,19 +491,7 @@ class UpdateHandler(object): @property def _is_clean_start(self): - if not os.path.isfile(self._sentinal_file_path()): - return True - - try: - if fileutil.read_file(self._sentinal_file_path()) != CURRENT_AGENT: - return True - except Exception as e: - logger.warn( - u"Exception reading sentinal file {0}: {1}", - self._sentinal_file_path(), - str(e)) - - return False + return not os.path.isfile(self._sentinal_file_path()) @property def _is_orphaned(self): @@ -472,11 +504,29 @@ class UpdateHandler(object): return fileutil.read_file(conf.get_agent_pid_file_path()) != ustr(parent_pid) + def _is_version_eligible(self, version): + # Ensure the installed version is always eligible + if version == CURRENT_VERSION and is_current_agent_installed(): + return True + + for agent in self.agents: + if agent.version == version: + return agent.is_available + + return False + def _load_agents(self): path = os.path.join(conf.get_lib_dir(), "{0}-*".format(AGENT_NAME)) return [GuestAgent(path=agent_dir) for agent_dir in glob.iglob(path) if os.path.isdir(agent_dir)] + def _partition(self): + return int(fileutil.read_file(self._partition_file)) + + @property + def _partition_file(self): + return os.path.join(conf.get_lib_dir(), AGENT_PARTITION_FILE) + def _purge_agents(self): """ Remove from disk all directories and .zip files of unknown agents @@ -485,8 +535,8 @@ class UpdateHandler(object): path = os.path.join(conf.get_lib_dir(), "{0}-*".format(AGENT_NAME)) known_versions = [agent.version for agent in self.agents] - if not is_current_agent_installed() and CURRENT_VERSION not in known_versions: - logger.warn( + if CURRENT_VERSION not in known_versions: + logger.info( u"Running Agent {0} was not found in the agent manifest - adding to list", CURRENT_VERSION) known_versions.append(CURRENT_VERSION) @@ -511,9 +561,11 @@ class UpdateHandler(object): self.agents.sort(key=lambda agent: agent.version, reverse=True) return - def _set_sentinal(self, agent=CURRENT_AGENT): + def _set_sentinal(self, agent=CURRENT_AGENT, msg="Unknown cause"): try: - fileutil.write_file(self._sentinal_file_path(), agent) + fileutil.write_file( + self._sentinal_file_path(), + "[{0}] [{1}]".format(agent, msg)) except Exception as e: logger.warn( u"Exception writing sentinal file {0}: {1}", @@ -599,16 +651,18 @@ class UpdateHandler(object): self._purge_agents() self._filter_blacklisted_agents() - # Return True if more recent agents are available - return len(self.agents) > 0 and \ - self.agents[0].version > base_version + # Return True if current agent is no longer available or an + # agent with a higher version number is available + return not self._is_version_eligible(base_version) \ + or (len(self.agents) > 0 \ + and self.agents[0].version > base_version) except Exception as e: if isinstance(e, ResourceGoneError): continue msg = u"Exception retrieving agent manifests: {0}".format( - ustr(e)) + ustr(traceback.format_exc())) logger.warn(msg) add_event( AGENT_NAME, @@ -667,8 +721,6 @@ class GuestAgent(object): self.error = GuestAgentError(self.get_agent_error_file()) self.error.load() - self.supported = Supported(self.get_agent_supported_file()) - self.supported.load() try: self._ensure_downloaded() @@ -677,6 +729,13 @@ class GuestAgent(object): if isinstance(e, ResourceGoneError): raise + # The agent was improperly blacklisting versions due to a timeout + # encountered while downloading a later version. Errors of type + # socket.error are IOError, so this should provide sufficient + # protection against a large class of I/O operation failures. + if isinstance(e, IOError): + raise + # Note the failure, blacklist the agent if the package downloaded # - An exception with a downloaded package indicates the package # is corrupt (e.g., missing the HandlerManifest.json file) @@ -691,7 +750,6 @@ class GuestAgent(object): op=WALAEventOperation.Install, is_success=False, message=msg) - return @property def name(self): @@ -712,13 +770,9 @@ class GuestAgent(object): def get_agent_pkg_path(self): return ".".join((os.path.join(conf.get_lib_dir(), self.name), "zip")) - def get_agent_supported_file(self): - return os.path.join(conf.get_lib_dir(), self.name, AGENT_SUPPORTED_FILE) - def clear_error(self): self.error.clear() self.error.save() - return @property def is_available(self): @@ -730,15 +784,8 @@ class GuestAgent(object): @property def is_downloaded(self): - return self.is_blacklisted or os.path.isfile(self.get_agent_manifest_path()) - - @property - def _is_optional(self): - return self.error is not None and self.error.is_sentinel and self.supported.is_supported - - @property - def _in_slice(self): - return self.supported.is_supported and self.supported.in_slice + return self.is_blacklisted or \ + os.path.isfile(self.get_agent_manifest_path()) def mark_failure(self, is_fatal=False): try: @@ -750,21 +797,6 @@ class GuestAgent(object): logger.warn(u"Agent {0} is permanently blacklisted", self.name) except Exception as e: logger.warn(u"Agent {0} failed recording error state: {1}", self.name, ustr(e)) - return - - def _enable(self): - # Enable optional agents if within the "slice" - # - The "slice" is a percentage of the agent to execute - # - Blacklist out-of-slice agents to prevent reconsideration - if self._is_optional: - if self._in_slice: - self.error.clear() - self.error.save() - logger.info(u"Enabled optional Agent {0}", self.name) - else: - self.mark_failure(is_fatal=True) - logger.info(u"Optional Agent {0} not in slice", self.name) - return def _ensure_downloaded(self): logger.verbose(u"Ensuring Agent {0} is downloaded", self.name) @@ -788,18 +820,15 @@ class GuestAgent(object): op=WALAEventOperation.Install, is_success=True, message=msg) - return def _ensure_loaded(self): self._load_manifest() self._load_error() - self._load_supported() - - self._enable() - return def _download(self): - for uri in self.pkg.uris: + uris_shuffled = self.pkg.uris + random.shuffle(uris_shuffled) + for uri in uris_shuffled: if not HostPluginProtocol.is_default_channel() and self._fetch(uri.uri): break @@ -838,8 +867,6 @@ class GuestAgent(object): message=msg) raise UpdateError(msg) - return - def _fetch(self, uri, headers=None, use_proxy=True): package = None try: @@ -871,14 +898,6 @@ class GuestAgent(object): logger.verbose(u"Agent {0} error state: {1}", self.name, ustr(self.error)) except Exception as e: logger.warn(u"Agent {0} failed loading error state: {1}", self.name, ustr(e)) - return - - def _load_supported(self): - try: - self.supported = Supported(self.get_agent_supported_file()) - self.supported.load() - except Exception as e: - self.supported = Supported() def _load_manifest(self): path = self.get_agent_manifest_path() @@ -976,10 +995,6 @@ class GuestAgentError(object): def is_blacklisted(self): return self.was_fatal or self.failure_count >= MAX_FAILURE - @property - def is_sentinel(self): - return self.was_fatal and self.last_failure == 0.0 - def load(self): if self.path is not None and os.path.isfile(self.path): with open(self.path, 'r') as f: @@ -1015,60 +1030,3 @@ class GuestAgentError(object): self.last_failure, self.failure_count, self.was_fatal) - -class Supported(object): - def __init__(self, path): - if path is None: - raise UpdateError(u"Supported requires a path") - self.path = path - self.distributions = {} - return - - @property - def is_supported(self): - return self._supported_distribution is not None - - @property - def in_slice(self): - d = self._supported_distribution - return d is not None and d.in_slice - - def load(self): - self.distributions = {} - try: - if self.path is not None and os.path.isfile(self.path): - j = json.loads(fileutil.read_file(self.path)) - for d in j: - self.distributions[d] = SupportedDistribution(j[d]) - except Exception as e: - logger.warn("Failed JSON parse of {0}: {1}".format(self.path, e)) - return - - @property - def _supported_distribution(self): - for d in self.distributions: - dd = self.distributions[d] - if dd.is_supported: - return dd - return None - -class SupportedDistribution(object): - def __init__(self, s): - if s is None or not isinstance(s, dict): - raise UpdateError(u"SupportedDisribution requires a dictionary") - - self.slice = s['slice'] - self.versions = s['versions'] - - @property - def is_supported(self): - d = ','.join(platform.linux_distribution()) - for v in self.versions: - if re.match(v, d): - return True - return False - - @property - def in_slice(self): - n = int((60 * self.slice) / 100) - return (n - datetime.utcnow().second) > 0 |