summaryrefslogtreecommitdiff
path: root/azurelinuxagent/ga
diff options
context:
space:
mode:
Diffstat (limited to 'azurelinuxagent/ga')
-rw-r--r--azurelinuxagent/ga/env.py147
-rw-r--r--azurelinuxagent/ga/exthandlers.py236
-rw-r--r--azurelinuxagent/ga/monitor.py43
-rw-r--r--azurelinuxagent/ga/update.py280
4 files changed, 462 insertions, 244 deletions
diff --git a/azurelinuxagent/ga/env.py b/azurelinuxagent/ga/env.py
index 45b10bb..d9b7d82 100644
--- a/azurelinuxagent/ga/env.py
+++ b/azurelinuxagent/ga/env.py
@@ -17,11 +17,16 @@
# Requires Python 2.4+ and Openssl 1.0+
#
+import re
import os
import socket
import time
import threading
+import operator
+
+import datetime
+
import azurelinuxagent.common.conf as conf
import azurelinuxagent.common.logger as logger
@@ -29,15 +34,29 @@ from azurelinuxagent.common.dhcp import get_dhcp_handler
from azurelinuxagent.common.event import add_periodic, WALAEventOperation
from azurelinuxagent.common.osutil import get_osutil
from azurelinuxagent.common.protocol import get_protocol_util
+from azurelinuxagent.common.protocol.wire import INCARNATION_FILE_NAME
+from azurelinuxagent.common.utils import fileutil
from azurelinuxagent.common.version import AGENT_NAME, CURRENT_VERSION
+CACHE_PATTERNS = [
+ re.compile("^(.*)\.(\d+)\.(agentsManifest)$", re.IGNORECASE),
+ re.compile("^(.*)\.(\d+)\.(manifest\.xml)$", re.IGNORECASE),
+ re.compile("^(.*)\.(\d+)\.(xml)$", re.IGNORECASE)
+]
+
+MAXIMUM_CACHED_FILES = 50
+
+CACHE_PURGE_INTERVAL = datetime.timedelta(hours=24)
+
+
def get_env_handler():
return EnvHandler()
+
class EnvHandler(object):
"""
Monitor changes to dhcp and hostname.
- If dhcp clinet process re-start has occurred, reset routes, dhcp with fabric.
+ If dhcp client process re-start has occurred, reset routes, dhcp with fabric.
Monitor scsi disk.
If new scsi disk found, set timeout
@@ -48,9 +67,10 @@ class EnvHandler(object):
self.protocol_util = get_protocol_util()
self.stopped = True
self.hostname = None
- self.dhcpid = None
+ self.dhcp_id = None
self.server_thread = None
self.dhcp_warning_enabled = True
+ self.last_purge = None
def run(self):
if not self.stopped:
@@ -61,7 +81,7 @@ class EnvHandler(object):
logger.info("Start env monitor service.")
self.dhcp_handler.conf_routes()
self.hostname = self.osutil.get_hostname_record()
- self.dhcpid = self.osutil.get_dhcp_pid()
+ self.dhcp_id = self.osutil.get_dhcp_pid()
self.server_thread = threading.Thread(target=self.monitor)
self.server_thread.setDaemon(True)
self.server_thread.start()
@@ -70,26 +90,24 @@ class EnvHandler(object):
"""
Monitor firewall rules
Monitor dhcp client pid and hostname.
- If dhcp clinet process re-start has occurred, reset routes.
+ If dhcp client process re-start has occurred, reset routes.
+ Purge unnecessary files from disk cache.
"""
protocol = self.protocol_util.get_protocol()
while not self.stopped:
self.osutil.remove_rules_files()
- # Disable setting firewall for now, regardless of configuration switch
- # if conf.enable_firewall():
- # success = self.osutil.enable_firewall(
- # dst_ip=protocol.endpoint,
- # uid=os.getuid())
- # add_periodic(
- # logger.EVERY_HOUR,
- # AGENT_NAME,
- # version=CURRENT_VERSION,
- # op=WALAEventOperation.Firewall,
- # is_success=success,
- # log_event=True)
-
- self.osutil.remove_firewall()
+ if conf.enable_firewall():
+ success = self.osutil.enable_firewall(
+ dst_ip=protocol.endpoint,
+ uid=os.getuid())
+ add_periodic(
+ logger.EVERY_HOUR,
+ AGENT_NAME,
+ version=CURRENT_VERSION,
+ op=WALAEventOperation.Firewall,
+ is_success=success,
+ log_event=False)
timeout = conf.get_root_device_scsi_timeout()
if timeout is not None:
@@ -100,6 +118,8 @@ class EnvHandler(object):
self.handle_dhclient_restart()
+ self.purge_disk_cache()
+
time.sleep(5)
def handle_hostname_update(self):
@@ -113,30 +133,95 @@ class EnvHandler(object):
self.hostname = curr_hostname
def handle_dhclient_restart(self):
- if self.dhcpid is None:
+ if self.dhcp_id is None:
if self.dhcp_warning_enabled:
logger.warn("Dhcp client is not running. ")
- self.dhcpid = self.osutil.get_dhcp_pid()
+ self.dhcp_id = self.osutil.get_dhcp_pid()
# disable subsequent error logging
- self.dhcp_warning_enabled = self.dhcpid is not None
+ self.dhcp_warning_enabled = self.dhcp_id is not None
+ return
+
+ # the dhcp process has not changed since the last check
+ if self.osutil.check_pid_alive(self.dhcp_id.strip()):
return
- #The dhcp process hasn't changed since last check
- if self.osutil.check_pid_alive(self.dhcpid.strip()):
+ new_pid = self.osutil.get_dhcp_pid()
+ if new_pid is not None and new_pid != self.dhcp_id:
+ logger.info("EnvMonitor: Detected dhcp client restart. "
+ "Restoring routing table.")
+ self.dhcp_handler.conf_routes()
+ self.dhcp_id = new_pid
+
+ def purge_disk_cache(self):
+ """
+ Ensure the number of cached files does not exceed a maximum count.
+ Purge only once per interval, and never delete files related to the
+ current incarnation.
+ """
+ if self.last_purge is not None \
+ and datetime.datetime.utcnow() < \
+ self.last_purge + CACHE_PURGE_INTERVAL:
+ return
+
+ current_incarnation = -1
+ self.last_purge = datetime.datetime.utcnow()
+ incarnation_file = os.path.join(conf.get_lib_dir(),
+ INCARNATION_FILE_NAME)
+ if os.path.exists(incarnation_file):
+ last_incarnation = fileutil.read_file(incarnation_file)
+ if last_incarnation is not None:
+ current_incarnation = int(last_incarnation)
+
+ logger.info("Purging disk cache, current incarnation is {0}"
+ .format('not found'
+ if current_incarnation == -1
+ else current_incarnation))
+
+ # Create tuples: (prefix, suffix, incarnation, name, file_modified)
+ files = []
+ for f in os.listdir(conf.get_lib_dir()):
+ full_path = os.path.join(conf.get_lib_dir(), f)
+ for pattern in CACHE_PATTERNS:
+ m = pattern.match(f)
+ if m is not None:
+ prefix = m.group(1)
+ suffix = m.group(3)
+ incarnation = int(m.group(2))
+ file_modified = os.path.getmtime(full_path)
+ t = (prefix, suffix, incarnation, f, file_modified)
+ files.append(t)
+ break
+
+ if len(files) <= 0:
return
- newpid = self.osutil.get_dhcp_pid()
- if newpid is not None and newpid != self.dhcpid:
- logger.info("EnvMonitor: Detected dhcp client restart. "
- "Restoring routing table.")
- self.dhcp_handler.conf_routes()
- self.dhcpid = newpid
+ # Sort by (prefix, suffix, file_modified) in reverse order
+ files = sorted(files, key=operator.itemgetter(0, 1, 4), reverse=True)
+
+ # Remove any files in excess of the maximum allowed
+ # -- Restart then whenever the (prefix, suffix) change
+ count = 0
+ last_match = [None, None]
+ for f in files:
+ if last_match != f[0:2]:
+ last_match = f[0:2]
+ count = 0
+
+ if current_incarnation == f[2]:
+ logger.verbose("Skipping {0}".format(f[3]))
+ continue
+
+ count += 1
+
+ if count > MAXIMUM_CACHED_FILES:
+ full_name = os.path.join(conf.get_lib_dir(), f[3])
+ logger.verbose("Deleting {0}".format(full_name))
+ os.remove(full_name)
def stop(self):
"""
- Stop server comminucation and join the thread to main thread.
+ Stop server communication and join the thread to main thread.
"""
self.stopped = True
if self.server_thread is not None:
self.server_thread.join()
-
diff --git a/azurelinuxagent/ga/exthandlers.py b/azurelinuxagent/ga/exthandlers.py
index f0a3b09..cc1796b 100644
--- a/azurelinuxagent/ga/exthandlers.py
+++ b/azurelinuxagent/ga/exthandlers.py
@@ -17,10 +17,12 @@
# Requires Python 2.4+ and Openssl 1.0+
#
+import datetime
import glob
import json
import os
import os.path
+import random
import re
import shutil
import stat
@@ -31,25 +33,22 @@ import zipfile
import azurelinuxagent.common.conf as conf
import azurelinuxagent.common.logger as logger
import azurelinuxagent.common.utils.fileutil as fileutil
-import azurelinuxagent.common.utils.restutil as restutil
-import azurelinuxagent.common.utils.shellutil as shellutil
import azurelinuxagent.common.version as version
+from azurelinuxagent.common.errorstate import ErrorState, ERROR_STATE_DELTA
-from azurelinuxagent.common.event import add_event, WALAEventOperation
-from azurelinuxagent.common.exception import ExtensionError, ProtocolError, HttpError
+from azurelinuxagent.common.event import add_event, WALAEventOperation, elapsed_milliseconds
+from azurelinuxagent.common.exception import ExtensionError, ProtocolError, RestartError
from azurelinuxagent.common.future import ustr
-from azurelinuxagent.common.version import AGENT_VERSION
from azurelinuxagent.common.protocol.restapi import ExtHandlerStatus, \
ExtensionStatus, \
ExtensionSubStatus, \
- Extension, \
VMStatus, ExtHandler, \
get_properties, \
set_properties
from azurelinuxagent.common.utils.flexible_version import FlexibleVersion
-from azurelinuxagent.common.utils.textutil import Version
from azurelinuxagent.common.protocol import get_protocol_util
-from azurelinuxagent.common.version import AGENT_NAME, CURRENT_AGENT, CURRENT_VERSION
+from azurelinuxagent.common.version import AGENT_NAME, CURRENT_VERSION
+
#HandlerEnvironment.json schema version
HANDLER_ENVIRONMENT_VERSION = 1.0
@@ -64,7 +63,6 @@ HANDLER_PKG_EXT = ".zip"
HANDLER_PKG_PATTERN = re.compile(HANDLER_PATTERN+"\\"+HANDLER_PKG_EXT+"$",
re.IGNORECASE)
-
def validate_has_key(obj, key, fullname):
if key not in obj:
raise ExtensionError("Missing: {0}".format(fullname))
@@ -176,35 +174,73 @@ class ExtHandlersHandler(object):
self.protocol = None
self.ext_handlers = None
self.last_etag = None
+ self.last_upgrade_guids = {}
self.log_report = False
self.log_etag = True
+ self.log_process = False
+
+ self.report_status_error_state = ErrorState()
def run(self):
self.ext_handlers, etag = None, None
try:
self.protocol = self.protocol_util.get_protocol()
self.ext_handlers, etag = self.protocol.get_ext_handlers()
- except ProtocolError as e:
+ except Exception as e:
msg = u"Exception retrieving extension handlers: {0}".format(
ustr(e))
logger.warn(msg)
- add_event(AGENT_NAME, version=CURRENT_VERSION, is_success=False, message=msg)
+ add_event(AGENT_NAME,
+ version=CURRENT_VERSION,
+ op=WALAEventOperation.ExtensionProcessing,
+ is_success=False,
+ message=msg)
return
- msg = u"Handle extensions updates for incarnation {0}".format(etag)
- logger.verbose(msg)
- # Log status report success on new config
- self.log_report = True
- self.handle_ext_handlers(etag)
- self.last_etag = etag
-
- self.report_ext_handlers_status()
- self.cleanup_outdated_handlers()
+ try:
+ msg = u"Handle extensions updates for incarnation {0}".format(etag)
+ logger.verbose(msg)
+ # Log status report success on new config
+ self.log_report = True
+ self.handle_ext_handlers(etag)
+ self.last_etag = etag
+
+ self.report_ext_handlers_status()
+ self.cleanup_outdated_handlers()
+ except RestartError:
+ raise
+ except Exception as e:
+ msg = u"Exception processing extension handlers: {0}".format(
+ ustr(e))
+ logger.warn(msg)
+ add_event(AGENT_NAME,
+ version=CURRENT_VERSION,
+ op=WALAEventOperation.ExtensionProcessing,
+ is_success=False,
+ message=msg)
+ return
def run_status(self):
self.report_ext_handlers_status()
return
+ def get_upgrade_guid(self, name):
+ return self.last_upgrade_guids.get(name, (None, False))[0]
+
+ def get_log_upgrade_guid(self, ext_handler):
+ return self.last_upgrade_guids.get(ext_handler.name, (None, False))[1]
+
+ def set_log_upgrade_guid(self, ext_handler, log_val):
+ guid = self.get_upgrade_guid(ext_handler.name)
+ if guid is not None:
+ self.last_upgrade_guids[ext_handler.name] = (guid, log_val)
+
+ def is_new_guid(self, ext_handler):
+ last_guid = self.get_upgrade_guid(ext_handler.name)
+ if last_guid is None:
+ return True
+ return last_guid != ext_handler.properties.upgradeGuid
+
def cleanup_outdated_handlers(self):
handlers = []
pkgs = []
@@ -288,7 +324,29 @@ class ExtHandlersHandler(object):
ext_handler_i = ExtHandlerInstance(ext_handler, self.protocol)
try:
- ext_handler_i.decide_version()
+ state = ext_handler.properties.state
+ # The extension is to be enabled, there is an upgrade GUID
+ # and the GUID is NOT new
+ if state == u"enabled" and \
+ ext_handler.properties.upgradeGuid is not None and \
+ not self.is_new_guid(ext_handler):
+ ext_handler_i.ext_handler.properties.version = ext_handler_i.get_installed_version()
+ ext_handler_i.set_logger()
+ if self.last_etag != etag:
+ self.set_log_upgrade_guid(ext_handler, True)
+
+ msg = "New GUID is the same as the old GUID. Exiting without upgrading."
+ if self.get_log_upgrade_guid(ext_handler):
+ ext_handler_i.logger.info(msg)
+ self.set_log_upgrade_guid(ext_handler, False)
+ ext_handler_i.set_handler_state(ExtHandlerState.Enabled)
+ ext_handler_i.set_handler_status(status="Ready", message="No change")
+ ext_handler_i.set_operation(WALAEventOperation.SkipUpdate)
+ ext_handler_i.report_event(message=ustr(msg), is_success=True)
+ return
+
+ self.set_log_upgrade_guid(ext_handler, True)
+ ext_handler_i.decide_version(etag=etag, target_state=state)
if not ext_handler_i.is_upgrade and self.last_etag == etag:
if self.log_etag:
ext_handler_i.logger.verbose("Version {0} is current for etag {1}",
@@ -299,22 +357,34 @@ class ExtHandlersHandler(object):
self.log_etag = True
- state = ext_handler.properties.state
ext_handler_i.logger.info("Target handler state: {0}", state)
if state == u"enabled":
self.handle_enable(ext_handler_i)
+ if ext_handler.properties.upgradeGuid is not None:
+ ext_handler_i.logger.info("New Upgrade GUID: {0}", ext_handler.properties.upgradeGuid)
+ self.last_upgrade_guids[ext_handler.name] = (ext_handler.properties.upgradeGuid, True)
elif state == u"disabled":
self.handle_disable(ext_handler_i)
+ # Remove the GUID from the dictionary so that it is upgraded upon re-enable
+ self.last_upgrade_guids.pop(ext_handler.name, None)
elif state == u"uninstall":
self.handle_uninstall(ext_handler_i)
+ # Remove the GUID from the dictionary so that it is upgraded upon re-install
+ self.last_upgrade_guids.pop(ext_handler.name, None)
else:
message = u"Unknown ext handler state:{0}".format(state)
raise ExtensionError(message)
- except ExtensionError as e:
+ except RestartError:
+ ext_handler_i.logger.info("GoalState became stale during "
+ "processing. Restarting with new "
+ "GoalState")
+ raise
+ except Exception as e:
ext_handler_i.set_handler_status(message=ustr(e), code=-1)
ext_handler_i.report_event(message=ustr(e), is_success=False)
def handle_enable(self, ext_handler_i):
+ self.log_process = True
old_ext_handler_i = ext_handler_i.get_installed_ext_handler()
if old_ext_handler_i is not None and \
old_ext_handler_i.version_gt(ext_handler_i):
@@ -341,6 +411,7 @@ class ExtHandlersHandler(object):
ext_handler_i.enable()
def handle_disable(self, ext_handler_i):
+ self.log_process = True
handler_state = ext_handler_i.get_handler_state()
ext_handler_i.logger.info("[Disable] current handler state is: {0}",
handler_state.lower())
@@ -348,6 +419,7 @@ class ExtHandlersHandler(object):
ext_handler_i.disable()
def handle_uninstall(self, ext_handler_i):
+ self.log_process = True
handler_state = ext_handler_i.get_handler_state()
ext_handler_i.logger.info("[Uninstall] current handler state is: {0}",
handler_state.lower())
@@ -356,7 +428,7 @@ class ExtHandlersHandler(object):
ext_handler_i.disable()
ext_handler_i.uninstall()
ext_handler_i.rm_ext_handler_dir()
-
+
def report_ext_handlers_status(self):
"""Go through handler_state dir, collect and report status"""
vm_status = VMStatus(status="Ready", message="Guest Agent is running")
@@ -368,6 +440,7 @@ class ExtHandlersHandler(object):
add_event(
AGENT_NAME,
version=CURRENT_VERSION,
+ op=WALAEventOperation.ExtensionProcessing,
is_success=False,
message=ustr(e))
@@ -376,9 +449,27 @@ class ExtHandlersHandler(object):
self.protocol.report_vm_status(vm_status)
if self.log_report:
logger.verbose("Completed vm agent status report")
+ self.report_status_error_state.reset()
except ProtocolError as e:
+ self.report_status_error_state.incr()
message = "Failed to report vm agent status: {0}".format(e)
- add_event(AGENT_NAME, version=CURRENT_VERSION, is_success=False, message=message)
+ add_event(AGENT_NAME,
+ version=CURRENT_VERSION,
+ op=WALAEventOperation.ExtensionProcessing,
+ is_success=False,
+ message=message)
+
+ if self.report_status_error_state.is_triggered():
+ message = "Failed to report vm agent status for more than {0}"\
+ .format(ERROR_STATE_DELTA)
+
+ add_event(AGENT_NAME,
+ version=CURRENT_VERSION,
+ op=WALAEventOperation.ExtensionProcessing,
+ is_success=False,
+ message=message)
+
+ self.report_status_error_state.reset()
def report_ext_handler_status(self, vm_status, ext_handler):
ext_handler_i = ExtHandlerInstance(ext_handler, self.protocol)
@@ -386,6 +477,9 @@ class ExtHandlersHandler(object):
handler_status = ext_handler_i.get_handler_status()
if handler_status is None:
return
+ guid = self.get_upgrade_guid(ext_handler.name)
+ if guid is not None:
+ handler_status.upgradeGuid = guid
handler_state = ext_handler_i.get_handler_state()
if handler_state != ExtHandlerState.NotInstalled:
@@ -413,9 +507,7 @@ class ExtHandlerInstance(object):
self.pkg = None
self.pkg_file = None
self.is_upgrade = False
-
- prefix = "[{0}]".format(self.get_full_name())
- self.logger = logger.Logger(logger.DEFAULT_LOGGER, prefix)
+ self.set_logger()
try:
fileutil.mkdir(self.get_log_dir(), mode=0o755)
@@ -426,15 +518,13 @@ class ExtHandlerInstance(object):
self.logger.add_appender(logger.AppenderType.FILE,
logger.LogLevel.INFO, log_file)
- def decide_version(self):
+ def decide_version(self, etag, target_state=None):
self.logger.verbose("Decide which version to use")
- try:
- pkg_list = self.protocol.get_ext_handler_pkgs(self.ext_handler)
- except ProtocolError as e:
- raise ExtensionError("Failed to get ext handler pkgs", e)
+ pkg_list = self.protocol.get_ext_handler_pkgs(self.ext_handler, etag)
# Determine the desired and installed versions
- requested_version = FlexibleVersion(self.ext_handler.properties.version)
+ requested_version = FlexibleVersion(
+ str(self.ext_handler.properties.version))
installed_version_string = self.get_installed_version()
installed_version = requested_version \
if installed_version_string is None \
@@ -511,13 +601,22 @@ class ExtHandlerInstance(object):
# Note:
# - A downgrade, which will be bound to the same major version,
# is allowed if the installed version is no longer available
- if selected_pkg is None \
+ if target_state == u"uninstall":
+ if installed_pkg is None:
+ msg = "Failed to find installed version of {0} " \
+ "to uninstall".format(self.ext_handler.name)
+ self.logger.warn(msg)
+ self.pkg = installed_pkg
+ self.ext_handler.properties.version = str(installed_version) \
+ if installed_version is not None else None
+ elif selected_pkg is None \
or (installed_pkg is not None and selected_version < installed_version):
self.pkg = installed_pkg
- self.ext_handler.properties.version = installed_version
+ self.ext_handler.properties.version = str(installed_version) \
+ if installed_version is not None else None
else:
self.pkg = selected_pkg
- self.ext_handler.properties.version = selected_pkg.version
+ self.ext_handler.properties.version = str(selected_pkg.version)
# Note if the selected package is greater than that installed
if installed_pkg is None \
@@ -528,12 +627,17 @@ class ExtHandlerInstance(object):
raise ExtensionError("Failed to find any valid extension package")
self.logger.verbose("Use version: {0}", self.pkg.version)
+ self.set_logger()
return
+ def set_logger(self):
+ prefix = "[{0}]".format(self.get_full_name())
+ self.logger = logger.Logger(logger.DEFAULT_LOGGER, prefix)
+
def version_gt(self, other):
self_version = self.ext_handler.properties.version
other_version = other.ext_handler.properties.version
- return Version(self_version) > Version(other_version)
+ return FlexibleVersion(self_version) > FlexibleVersion(other_version)
def get_installed_ext_handler(self):
lastest_version = self.get_installed_version()
@@ -585,23 +689,26 @@ class ExtHandlerInstance(object):
status_file = os.path.join(old_ext_status_dir, status_file)
if os.path.isfile(status_file):
shutil.copy2(status_file, new_ext_status_dir)
-
+
def set_operation(self, op):
self.operation = op
- def report_event(self, message="", is_success=True):
+ def report_event(self, message="", is_success=True, duration=0):
version = self.ext_handler.properties.version
add_event(name=self.ext_handler.name, version=version, message=message,
- op=self.operation, is_success=is_success)
+ op=self.operation, is_success=is_success, duration=duration)
def download(self):
+ begin_utc = datetime.datetime.utcnow()
self.logger.verbose("Download extension package")
self.set_operation(WALAEventOperation.Download)
if self.pkg is None:
raise ExtensionError("No package uri found")
package = None
- for uri in self.pkg.uris:
+ uris_shuffled = self.pkg.uris
+ random.shuffle(uris_shuffled)
+ for uri in uris_shuffled:
try:
package = self.protocol.download_ext_handler_pkg(uri.uri)
if package is not None:
@@ -627,7 +734,8 @@ class ExtHandlerInstance(object):
for file in fileutil.get_all_files(self.get_base_dir()):
fileutil.chmod(file, os.stat(file).st_mode | stat.S_IXUSR)
- self.report_event(message="Download succeeded")
+ duration = elapsed_milliseconds(begin_utc)
+ self.report_event(message="Download succeeded", duration=duration)
self.logger.info("Initialize extension directory")
#Save HandlerManifest.json
@@ -649,8 +757,25 @@ class ExtHandlerInstance(object):
try:
status_dir = self.get_status_dir()
fileutil.mkdir(status_dir, mode=0o700)
+
+ seq_no, status_path = self.get_status_file_path()
+ if seq_no > -1:
+ now = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
+ status = {
+ "version": 1.0,
+ "timestampUTC" : now,
+ "status" : {
+ "name" : self.ext_handler.name,
+ "operation" : "Enabling Handler",
+ "status" : "transitioning",
+ "code" : 0
+ }
+ }
+ fileutil.write_file(json.dumps(status), status_path)
+
conf_dir = self.get_conf_dir()
fileutil.mkdir(conf_dir, mode=0o700)
+
except IOError as e:
fileutil.clean_ioerror(e,
paths=[self.get_base_dir(), self.pkg_file])
@@ -740,17 +865,24 @@ class ExtHandlerInstance(object):
continue
return seq_no
+ def get_status_file_path(self):
+ seq_no = self.get_largest_seq_no()
+ path = None
+
+ if seq_no > -1:
+ path = os.path.join(
+ self.get_status_dir(),
+ "{0}.status".format(seq_no))
+
+ return seq_no, path
+
def collect_ext_status(self, ext):
self.logger.verbose("Collect extension status")
- seq_no = self.get_largest_seq_no()
+ seq_no, ext_status_file = self.get_status_file_path()
if seq_no == -1:
return None
- status_dir = self.get_status_dir()
- ext_status_file = "{0}.status".format(seq_no)
- ext_status_file = os.path.join(status_dir, ext_status_file)
-
ext_status = ExtensionStatus(seq_no=seq_no)
try:
data_str = fileutil.read_file(ext_status_file)
@@ -810,6 +942,7 @@ class ExtHandlerInstance(object):
return last_update <= 600 # updated within the last 10 min
def launch_command(self, cmd, timeout=300):
+ begin_utc = datetime.datetime.utcnow()
self.logger.verbose("Launch command: [{0}]", cmd)
base_dir = self.get_base_dir()
try:
@@ -835,7 +968,8 @@ class ExtHandlerInstance(object):
if ret == None or ret != 0:
raise ExtensionError("Non-zero exit code: {0}, {1}".format(ret, cmd))
- self.report_event(message="Launch command succeeded: {0}".format(cmd))
+ duration = elapsed_milliseconds(begin_utc)
+ self.report_event(message="Launch command succeeded: {0}".format(cmd), duration=duration)
def load_manifest(self):
man_file = self.get_manifest_file()
@@ -929,7 +1063,7 @@ class ExtHandlerInstance(object):
handler_status = ExtHandlerStatus()
handler_status.name = self.ext_handler.name
- handler_status.version = self.ext_handler.properties.version
+ handler_status.version = str(self.ext_handler.properties.version)
handler_status.message = message
handler_status.code = code
handler_status.status = status
@@ -980,7 +1114,7 @@ class ExtHandlerInstance(object):
def get_log_dir(self):
return os.path.join(conf.get_ext_log_dir(), self.ext_handler.name,
- self.ext_handler.properties.version)
+ str(self.ext_handler.properties.version))
class HandlerEnvironment(object):
def __init__(self, data):
diff --git a/azurelinuxagent/ga/monitor.py b/azurelinuxagent/ga/monitor.py
index 307a514..71ac9b0 100644
--- a/azurelinuxagent/ga/monitor.py
+++ b/azurelinuxagent/ga/monitor.py
@@ -21,8 +21,10 @@ import os
import platform
import time
import threading
+import uuid
import azurelinuxagent.common.conf as conf
+import azurelinuxagent.common.utils.fileutil as fileutil
import azurelinuxagent.common.logger as logger
from azurelinuxagent.common.event import add_event, WALAEventOperation
@@ -34,6 +36,7 @@ from azurelinuxagent.common.protocol.restapi import TelemetryEventParam, \
TelemetryEventList, \
TelemetryEvent, \
set_properties
+from azurelinuxagent.common.utils.restutil import IOErrorCounter
from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, getattrib
from azurelinuxagent.common.version import DISTRO_NAME, DISTRO_VERSION, \
DISTRO_CODE_NAME, AGENT_LONG_VERSION, \
@@ -179,15 +182,53 @@ class MonitorHandler(object):
def daemon(self):
period = datetime.timedelta(minutes=30)
+ protocol = self.protocol_util.get_protocol()
last_heartbeat = datetime.datetime.utcnow() - period
+
+ # Create a new identifier on each restart and reset the counter
+ heartbeat_id = str(uuid.uuid4()).upper()
+ counter = 0
while True:
if datetime.datetime.utcnow() >= (last_heartbeat + period):
last_heartbeat = datetime.datetime.utcnow()
+ incarnation = protocol.get_incarnation()
+ dropped_packets = self.osutil.get_firewall_dropped_packets(
+ protocol.endpoint)
+
+ msg = "{0};{1};{2};{3}".format(
+ incarnation, counter, heartbeat_id, dropped_packets)
+
add_event(
name=AGENT_NAME,
version=CURRENT_VERSION,
op=WALAEventOperation.HeartBeat,
- is_success=True)
+ is_success=True,
+ message=msg,
+ log_event=False)
+
+ counter += 1
+
+ io_errors = IOErrorCounter.get_and_reset()
+ hostplugin_errors = io_errors.get("hostplugin")
+ protocol_errors = io_errors.get("protocol")
+ other_errors = io_errors.get("other")
+
+ if hostplugin_errors > 0 \
+ or protocol_errors > 0 \
+ or other_errors > 0:
+
+ msg = "hostplugin:{0};protocol:{1};other:{2}"\
+ .format(hostplugin_errors,
+ protocol_errors,
+ other_errors)
+ add_event(
+ name=AGENT_NAME,
+ version=CURRENT_VERSION,
+ op=WALAEventOperation.HttpErrors,
+ is_success=True,
+ message=msg,
+ log_event=False)
+
try:
self.collect_and_send_events()
except Exception as e:
diff --git a/azurelinuxagent/ga/update.py b/azurelinuxagent/ga/update.py
index b7ee96a..2e43031 100644
--- a/azurelinuxagent/ga/update.py
+++ b/azurelinuxagent/ga/update.py
@@ -21,9 +21,11 @@ import glob
import json
import os
import platform
+import random
import re
import shutil
import signal
+import stat
import subprocess
import sys
import time
@@ -43,6 +45,7 @@ from azurelinuxagent.common.event import add_event, add_periodic, \
WALAEventOperation
from azurelinuxagent.common.exception import ProtocolError, \
ResourceGoneError, \
+ RestartError, \
UpdateError
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common.osutil import get_osutil
@@ -60,7 +63,7 @@ from azurelinuxagent.ga.exthandlers import HandlerManifest
AGENT_ERROR_FILE = "error.json" # File name for agent error record
AGENT_MANIFEST_FILE = "HandlerManifest.json"
-AGENT_SUPPORTED_FILE = "supported.json"
+AGENT_PARTITION_FILE = "partition"
CHILD_HEALTH_INTERVAL = 15 * 60
CHILD_LAUNCH_INTERVAL = 5 * 60
@@ -71,10 +74,18 @@ MAX_FAILURE = 3 # Max failure allowed for agent before blacklisted
GOAL_STATE_INTERVAL = 3
-ORPHAN_WAIT_INTERVAL = 15 * 60 * 60
+ORPHAN_WAIT_INTERVAL = 15 * 60
AGENT_SENTINAL_FILE = "current_version"
+READONLY_FILE_GLOBS = [
+ "*.crt",
+ "*.p7m",
+ "*.pem",
+ "*.prv",
+ "ovf-env.xml"
+]
+
def get_update_handler():
return UpdateHandler()
@@ -101,7 +112,6 @@ class UpdateHandler(object):
self.child_process = None
self.signal_handler = None
- return
def run_latest(self, child_args=None):
"""
@@ -233,36 +243,43 @@ class UpdateHandler(object):
This is the main loop which watches for agent and extension updates.
"""
- logger.info(u"Agent {0} is running as the goal state agent",
- CURRENT_AGENT)
+ try:
+ logger.info(u"Agent {0} is running as the goal state agent",
+ CURRENT_AGENT)
- # Launch monitoring threads
- from azurelinuxagent.ga.monitor import get_monitor_handler
- get_monitor_handler().run()
+ # Launch monitoring threads
+ from azurelinuxagent.ga.monitor import get_monitor_handler
+ get_monitor_handler().run()
- from azurelinuxagent.ga.env import get_env_handler
- get_env_handler().run()
+ from azurelinuxagent.ga.env import get_env_handler
+ get_env_handler().run()
- from azurelinuxagent.ga.exthandlers import get_exthandlers_handler, migrate_handler_state
- exthandlers_handler = get_exthandlers_handler()
- migrate_handler_state()
+ from azurelinuxagent.ga.exthandlers import get_exthandlers_handler, migrate_handler_state
+ exthandlers_handler = get_exthandlers_handler()
+ migrate_handler_state()
- try:
self._ensure_no_orphans()
self._emit_restart_event()
+ self._ensure_partition_assigned()
+ self._ensure_readonly_files()
while self.running:
if self._is_orphaned:
- logger.info("Goal state agent {0} was orphaned -- exiting",
+ logger.info("Agent {0} is an orphan -- exiting",
CURRENT_AGENT)
break
if self._upgrade_available():
- if len(self.agents) > 0:
+ available_agent = self.get_latest_agent()
+ if available_agent is None:
+ logger.info(
+ "Agent {0} is reverting to the installed agent -- exiting",
+ CURRENT_AGENT)
+ else:
logger.info(
- u"Agent {0} discovered {1} as an update and will exit",
+ u"Agent {0} discovered update {1} -- exiting",
CURRENT_AGENT,
- self.agents[0].name)
+ available_agent.name)
break
utc_start = datetime.utcnow()
@@ -271,20 +288,24 @@ class UpdateHandler(object):
exthandlers_handler.run()
if last_etag != exthandlers_handler.last_etag:
+ self._ensure_readonly_files()
add_event(
AGENT_NAME,
version=CURRENT_VERSION,
op=WALAEventOperation.ProcessGoalState,
is_success=True,
duration=elapsed_milliseconds(utc_start),
+ message="Incarnation {0}".format(
+ exthandlers_handler.last_etag),
log_event=True)
time.sleep(GOAL_STATE_INTERVAL)
except Exception as e:
- logger.warn(u"Agent {0} failed with exception: {1}",
- CURRENT_AGENT,
- ustr(e))
+ msg = u"Agent {0} failed with exception: {1}".format(
+ CURRENT_AGENT, ustr(e))
+ self._set_sentinal(msg=msg)
+ logger.warn(msg)
logger.warn(traceback.format_exc())
sys.exit(1)
# additional return here because sys.exit is mocked in unit tests
@@ -338,17 +359,21 @@ class UpdateHandler(object):
return available_agents[0] if len(available_agents) >= 1 else None
def _emit_restart_event(self):
- if not self._is_clean_start:
- msg = u"{0} did not terminate cleanly".format(CURRENT_AGENT)
- logger.info(msg)
- add_event(
- AGENT_NAME,
- version=CURRENT_VERSION,
- op=WALAEventOperation.Restart,
- is_success=False,
- message=msg)
+ try:
+ if not self._is_clean_start:
+ msg = u"Agent did not terminate cleanly: {0}".format(
+ fileutil.read_file(self._sentinal_file_path()))
+ logger.info(msg)
+ add_event(
+ AGENT_NAME,
+ version=CURRENT_VERSION,
+ op=WALAEventOperation.Restart,
+ is_success=False,
+ message=msg)
+ except Exception:
+ pass
- self._set_sentinal()
+ self._set_sentinal(msg="Starting")
return
def _ensure_no_orphans(self, orphan_wait_interval=ORPHAN_WAIT_INTERVAL):
@@ -382,6 +407,26 @@ class UpdateHandler(object):
ustr(e))
return
+ def _ensure_partition_assigned(self):
+ """
+ Assign the VM to a partition (0 - 99). Downloaded updates may be configured
+ to run on only some VMs; the assigned partition determines eligibility.
+ """
+ if not os.path.exists(self._partition_file):
+ partition = ustr(int(datetime.utcnow().microsecond / 10000))
+ fileutil.write_file(self._partition_file, partition)
+ add_event(
+ AGENT_NAME,
+ version=CURRENT_VERSION,
+ op=WALAEventOperation.Partition,
+ is_success=True,
+ message=partition)
+
+ def _ensure_readonly_files(self):
+ for g in READONLY_FILE_GLOBS:
+ for path in glob.iglob(os.path.join(conf.get_lib_dir(), g)):
+ os.chmod(path, stat.S_IRUSR)
+
def _evaluate_agent_health(self, latest_agent):
"""
Evaluate the health of the selected agent: If it is restarting
@@ -412,7 +457,6 @@ class UpdateHandler(object):
def _filter_blacklisted_agents(self):
self.agents = [agent for agent in self.agents if not agent.is_blacklisted]
- return
def _find_agents(self):
"""
@@ -447,19 +491,7 @@ class UpdateHandler(object):
@property
def _is_clean_start(self):
- if not os.path.isfile(self._sentinal_file_path()):
- return True
-
- try:
- if fileutil.read_file(self._sentinal_file_path()) != CURRENT_AGENT:
- return True
- except Exception as e:
- logger.warn(
- u"Exception reading sentinal file {0}: {1}",
- self._sentinal_file_path(),
- str(e))
-
- return False
+ return not os.path.isfile(self._sentinal_file_path())
@property
def _is_orphaned(self):
@@ -472,11 +504,29 @@ class UpdateHandler(object):
return fileutil.read_file(conf.get_agent_pid_file_path()) != ustr(parent_pid)
+ def _is_version_eligible(self, version):
+ # Ensure the installed version is always eligible
+ if version == CURRENT_VERSION and is_current_agent_installed():
+ return True
+
+ for agent in self.agents:
+ if agent.version == version:
+ return agent.is_available
+
+ return False
+
def _load_agents(self):
path = os.path.join(conf.get_lib_dir(), "{0}-*".format(AGENT_NAME))
return [GuestAgent(path=agent_dir)
for agent_dir in glob.iglob(path) if os.path.isdir(agent_dir)]
+ def _partition(self):
+ return int(fileutil.read_file(self._partition_file))
+
+ @property
+ def _partition_file(self):
+ return os.path.join(conf.get_lib_dir(), AGENT_PARTITION_FILE)
+
def _purge_agents(self):
"""
Remove from disk all directories and .zip files of unknown agents
@@ -485,8 +535,8 @@ class UpdateHandler(object):
path = os.path.join(conf.get_lib_dir(), "{0}-*".format(AGENT_NAME))
known_versions = [agent.version for agent in self.agents]
- if not is_current_agent_installed() and CURRENT_VERSION not in known_versions:
- logger.warn(
+ if CURRENT_VERSION not in known_versions:
+ logger.info(
u"Running Agent {0} was not found in the agent manifest - adding to list",
CURRENT_VERSION)
known_versions.append(CURRENT_VERSION)
@@ -511,9 +561,11 @@ class UpdateHandler(object):
self.agents.sort(key=lambda agent: agent.version, reverse=True)
return
- def _set_sentinal(self, agent=CURRENT_AGENT):
+ def _set_sentinal(self, agent=CURRENT_AGENT, msg="Unknown cause"):
try:
- fileutil.write_file(self._sentinal_file_path(), agent)
+ fileutil.write_file(
+ self._sentinal_file_path(),
+ "[{0}] [{1}]".format(agent, msg))
except Exception as e:
logger.warn(
u"Exception writing sentinal file {0}: {1}",
@@ -599,16 +651,18 @@ class UpdateHandler(object):
self._purge_agents()
self._filter_blacklisted_agents()
- # Return True if more recent agents are available
- return len(self.agents) > 0 and \
- self.agents[0].version > base_version
+ # Return True if current agent is no longer available or an
+ # agent with a higher version number is available
+ return not self._is_version_eligible(base_version) \
+ or (len(self.agents) > 0 \
+ and self.agents[0].version > base_version)
except Exception as e:
if isinstance(e, ResourceGoneError):
continue
msg = u"Exception retrieving agent manifests: {0}".format(
- ustr(e))
+ ustr(traceback.format_exc()))
logger.warn(msg)
add_event(
AGENT_NAME,
@@ -667,8 +721,6 @@ class GuestAgent(object):
self.error = GuestAgentError(self.get_agent_error_file())
self.error.load()
- self.supported = Supported(self.get_agent_supported_file())
- self.supported.load()
try:
self._ensure_downloaded()
@@ -677,6 +729,13 @@ class GuestAgent(object):
if isinstance(e, ResourceGoneError):
raise
+ # The agent was improperly blacklisting versions due to a timeout
+ # encountered while downloading a later version. Errors of type
+ # socket.error are IOError, so this should provide sufficient
+ # protection against a large class of I/O operation failures.
+ if isinstance(e, IOError):
+ raise
+
# Note the failure, blacklist the agent if the package downloaded
# - An exception with a downloaded package indicates the package
# is corrupt (e.g., missing the HandlerManifest.json file)
@@ -691,7 +750,6 @@ class GuestAgent(object):
op=WALAEventOperation.Install,
is_success=False,
message=msg)
- return
@property
def name(self):
@@ -712,13 +770,9 @@ class GuestAgent(object):
def get_agent_pkg_path(self):
return ".".join((os.path.join(conf.get_lib_dir(), self.name), "zip"))
- def get_agent_supported_file(self):
- return os.path.join(conf.get_lib_dir(), self.name, AGENT_SUPPORTED_FILE)
-
def clear_error(self):
self.error.clear()
self.error.save()
- return
@property
def is_available(self):
@@ -730,15 +784,8 @@ class GuestAgent(object):
@property
def is_downloaded(self):
- return self.is_blacklisted or os.path.isfile(self.get_agent_manifest_path())
-
- @property
- def _is_optional(self):
- return self.error is not None and self.error.is_sentinel and self.supported.is_supported
-
- @property
- def _in_slice(self):
- return self.supported.is_supported and self.supported.in_slice
+ return self.is_blacklisted or \
+ os.path.isfile(self.get_agent_manifest_path())
def mark_failure(self, is_fatal=False):
try:
@@ -750,21 +797,6 @@ class GuestAgent(object):
logger.warn(u"Agent {0} is permanently blacklisted", self.name)
except Exception as e:
logger.warn(u"Agent {0} failed recording error state: {1}", self.name, ustr(e))
- return
-
- def _enable(self):
- # Enable optional agents if within the "slice"
- # - The "slice" is a percentage of the agent to execute
- # - Blacklist out-of-slice agents to prevent reconsideration
- if self._is_optional:
- if self._in_slice:
- self.error.clear()
- self.error.save()
- logger.info(u"Enabled optional Agent {0}", self.name)
- else:
- self.mark_failure(is_fatal=True)
- logger.info(u"Optional Agent {0} not in slice", self.name)
- return
def _ensure_downloaded(self):
logger.verbose(u"Ensuring Agent {0} is downloaded", self.name)
@@ -788,18 +820,15 @@ class GuestAgent(object):
op=WALAEventOperation.Install,
is_success=True,
message=msg)
- return
def _ensure_loaded(self):
self._load_manifest()
self._load_error()
- self._load_supported()
-
- self._enable()
- return
def _download(self):
- for uri in self.pkg.uris:
+ uris_shuffled = self.pkg.uris
+ random.shuffle(uris_shuffled)
+ for uri in uris_shuffled:
if not HostPluginProtocol.is_default_channel() and self._fetch(uri.uri):
break
@@ -838,8 +867,6 @@ class GuestAgent(object):
message=msg)
raise UpdateError(msg)
- return
-
def _fetch(self, uri, headers=None, use_proxy=True):
package = None
try:
@@ -871,14 +898,6 @@ class GuestAgent(object):
logger.verbose(u"Agent {0} error state: {1}", self.name, ustr(self.error))
except Exception as e:
logger.warn(u"Agent {0} failed loading error state: {1}", self.name, ustr(e))
- return
-
- def _load_supported(self):
- try:
- self.supported = Supported(self.get_agent_supported_file())
- self.supported.load()
- except Exception as e:
- self.supported = Supported()
def _load_manifest(self):
path = self.get_agent_manifest_path()
@@ -976,10 +995,6 @@ class GuestAgentError(object):
def is_blacklisted(self):
return self.was_fatal or self.failure_count >= MAX_FAILURE
- @property
- def is_sentinel(self):
- return self.was_fatal and self.last_failure == 0.0
-
def load(self):
if self.path is not None and os.path.isfile(self.path):
with open(self.path, 'r') as f:
@@ -1015,60 +1030,3 @@ class GuestAgentError(object):
self.last_failure,
self.failure_count,
self.was_fatal)
-
-class Supported(object):
- def __init__(self, path):
- if path is None:
- raise UpdateError(u"Supported requires a path")
- self.path = path
- self.distributions = {}
- return
-
- @property
- def is_supported(self):
- return self._supported_distribution is not None
-
- @property
- def in_slice(self):
- d = self._supported_distribution
- return d is not None and d.in_slice
-
- def load(self):
- self.distributions = {}
- try:
- if self.path is not None and os.path.isfile(self.path):
- j = json.loads(fileutil.read_file(self.path))
- for d in j:
- self.distributions[d] = SupportedDistribution(j[d])
- except Exception as e:
- logger.warn("Failed JSON parse of {0}: {1}".format(self.path, e))
- return
-
- @property
- def _supported_distribution(self):
- for d in self.distributions:
- dd = self.distributions[d]
- if dd.is_supported:
- return dd
- return None
-
-class SupportedDistribution(object):
- def __init__(self, s):
- if s is None or not isinstance(s, dict):
- raise UpdateError(u"SupportedDisribution requires a dictionary")
-
- self.slice = s['slice']
- self.versions = s['versions']
-
- @property
- def is_supported(self):
- d = ','.join(platform.linux_distribution())
- for v in self.versions:
- if re.match(v, d):
- return True
- return False
-
- @property
- def in_slice(self):
- n = int((60 * self.slice) / 100)
- return (n - datetime.utcnow().second) > 0