summaryrefslogtreecommitdiff
path: root/azurelinuxagent/ga
diff options
context:
space:
mode:
Diffstat (limited to 'azurelinuxagent/ga')
-rw-r--r--azurelinuxagent/ga/__init__.py17
-rw-r--r--azurelinuxagent/ga/env.py112
-rw-r--r--azurelinuxagent/ga/exthandlers.py902
-rw-r--r--azurelinuxagent/ga/monitor.py192
-rw-r--r--azurelinuxagent/ga/update.py715
5 files changed, 1938 insertions, 0 deletions
diff --git a/azurelinuxagent/ga/__init__.py b/azurelinuxagent/ga/__init__.py
new file mode 100644
index 0000000..1ea2f38
--- /dev/null
+++ b/azurelinuxagent/ga/__init__.py
@@ -0,0 +1,17 @@
+# Copyright 2014 Microsoft Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Requires Python 2.4+ and Openssl 1.0+
+#
+
diff --git a/azurelinuxagent/ga/env.py b/azurelinuxagent/ga/env.py
new file mode 100644
index 0000000..2d67d4b
--- /dev/null
+++ b/azurelinuxagent/ga/env.py
@@ -0,0 +1,112 @@
+# Microsoft Azure Linux Agent
+#
+# Copyright 2014 Microsoft Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Requires Python 2.4+ and Openssl 1.0+
+#
+
+import os
+import socket
+import time
+import threading
+
+import azurelinuxagent.common.conf as conf
+import azurelinuxagent.common.logger as logger
+
+from azurelinuxagent.common.dhcp import get_dhcp_handler
+from azurelinuxagent.common.osutil import get_osutil
+
+def get_env_handler():
+ return EnvHandler()
+
+class EnvHandler(object):
+ """
+ Monitor changes to dhcp and hostname.
+ If dhcp clinet process re-start has occurred, reset routes, dhcp with fabric.
+
+ Monitor scsi disk.
+ If new scsi disk found, set timeout
+ """
+ def __init__(self):
+ self.osutil = get_osutil()
+ self.dhcp_handler = get_dhcp_handler()
+ self.stopped = True
+ self.hostname = None
+ self.dhcpid = None
+ self.server_thread=None
+
+ def run(self):
+ if not self.stopped:
+ logger.info("Stop existing env monitor service.")
+ self.stop()
+
+ self.stopped = False
+ logger.info("Start env monitor service.")
+ self.dhcp_handler.conf_routes()
+ self.hostname = socket.gethostname()
+ self.dhcpid = self.osutil.get_dhcp_pid()
+ self.server_thread = threading.Thread(target = self.monitor)
+ self.server_thread.setDaemon(True)
+ self.server_thread.start()
+
+ def monitor(self):
+ """
+ Monitor dhcp client pid and hostname.
+ If dhcp clinet process re-start has occurred, reset routes.
+ """
+ while not self.stopped:
+ self.osutil.remove_rules_files()
+ timeout = conf.get_root_device_scsi_timeout()
+ if timeout is not None:
+ self.osutil.set_scsi_disks_timeout(timeout)
+ if conf.get_monitor_hostname():
+ self.handle_hostname_update()
+ self.handle_dhclient_restart()
+ time.sleep(5)
+
+ def handle_hostname_update(self):
+ curr_hostname = socket.gethostname()
+ if curr_hostname != self.hostname:
+ logger.info("EnvMonitor: Detected host name change: {0} -> {1}",
+ self.hostname, curr_hostname)
+ self.osutil.set_hostname(curr_hostname)
+ self.osutil.publish_hostname(curr_hostname)
+ self.hostname = curr_hostname
+
+ def handle_dhclient_restart(self):
+ if self.dhcpid is None:
+ logger.warn("Dhcp client is not running. ")
+ self.dhcpid = self.osutil.get_dhcp_pid()
+ return
+
+ #The dhcp process hasn't changed since last check
+ if self.osutil.check_pid_alive(self.dhcpid.strip()):
+ return
+
+ newpid = self.osutil.get_dhcp_pid()
+ if newpid is not None and newpid != self.dhcpid:
+ logger.info("EnvMonitor: Detected dhcp client restart. "
+ "Restoring routing table.")
+ self.dhcp_handler.conf_routes()
+ self.dhcpid = newpid
+
+ def stop(self):
+ """
+ Stop server comminucation and join the thread to main thread.
+ """
+ self.stopped = True
+ if self.server_thread is not None:
+ self.server_thread.join()
+
diff --git a/azurelinuxagent/ga/exthandlers.py b/azurelinuxagent/ga/exthandlers.py
new file mode 100644
index 0000000..d3c8f32
--- /dev/null
+++ b/azurelinuxagent/ga/exthandlers.py
@@ -0,0 +1,902 @@
+# Microsoft Azure Linux Agent
+#
+# Copyright 2014 Microsoft Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Requires Python 2.4+ and Openssl 1.0+
+#
+
+import glob
+import json
+import os
+import shutil
+import subprocess
+import time
+import zipfile
+
+import azurelinuxagent.common.conf as conf
+import azurelinuxagent.common.logger as logger
+import azurelinuxagent.common.utils.fileutil as fileutil
+import azurelinuxagent.common.utils.restutil as restutil
+import azurelinuxagent.common.utils.shellutil as shellutil
+
+from azurelinuxagent.common.event import add_event, WALAEventOperation
+from azurelinuxagent.common.exception import ExtensionError, ProtocolError, HttpError
+from azurelinuxagent.common.future import ustr
+from azurelinuxagent.common.version import AGENT_VERSION
+from azurelinuxagent.common.protocol.restapi import ExtHandlerStatus, \
+ ExtensionStatus, \
+ ExtensionSubStatus, \
+ Extension, \
+ VMStatus, ExtHandler, \
+ get_properties, \
+ set_properties
+from azurelinuxagent.common.utils.flexible_version import FlexibleVersion
+from azurelinuxagent.common.utils.textutil import Version
+from azurelinuxagent.common.protocol import get_protocol_util
+from azurelinuxagent.common.version import AGENT_NAME, CURRENT_AGENT, CURRENT_VERSION
+
+#HandlerEnvironment.json schema version
+HANDLER_ENVIRONMENT_VERSION = 1.0
+
+VALID_EXTENSION_STATUS = ['transitioning', 'error', 'success', 'warning']
+
+VALID_HANDLER_STATUS = ['Ready', 'NotReady', "Installing", "Unresponsive"]
+
+def validate_has_key(obj, key, fullname):
+ if key not in obj:
+ raise ExtensionError("Missing: {0}".format(fullname))
+
+def validate_in_range(val, valid_range, name):
+ if val not in valid_range:
+ raise ExtensionError("Invalid {0}: {1}".format(name, val))
+
+def parse_formatted_message(formatted_message):
+ if formatted_message is None:
+ return None
+ validate_has_key(formatted_message, 'lang', 'formattedMessage/lang')
+ validate_has_key(formatted_message, 'message', 'formattedMessage/message')
+ return formatted_message.get('message')
+
+def parse_ext_substatus(substatus):
+ #Check extension sub status format
+ validate_has_key(substatus, 'status', 'substatus/status')
+ validate_in_range(substatus['status'], VALID_EXTENSION_STATUS,
+ 'substatus/status')
+ status = ExtensionSubStatus()
+ status.name = substatus.get('name')
+ status.status = substatus.get('status')
+ status.code = substatus.get('code', 0)
+ formatted_message = substatus.get('formattedMessage')
+ status.message = parse_formatted_message(formatted_message)
+ return status
+
+def parse_ext_status(ext_status, data):
+ if data is None or len(data) is None:
+ return
+ #Currently, only the first status will be reported
+ data = data[0]
+ #Check extension status format
+ validate_has_key(data, 'status', 'status')
+ status_data = data['status']
+ validate_has_key(status_data, 'status', 'status/status')
+
+ validate_in_range(status_data['status'], VALID_EXTENSION_STATUS,
+ 'status/status')
+
+ applied_time = status_data.get('configurationAppliedTime')
+ ext_status.configurationAppliedTime = applied_time
+ ext_status.operation = status_data.get('operation')
+ ext_status.status = status_data.get('status')
+ ext_status.code = status_data.get('code', 0)
+ formatted_message = status_data.get('formattedMessage')
+ ext_status.message = parse_formatted_message(formatted_message)
+ substatus_list = status_data.get('substatus')
+ if substatus_list is None:
+ return
+ for substatus in substatus_list:
+ ext_status.substatusList.append(parse_ext_substatus(substatus))
+
+class ExtHandlerState(object):
+ NotInstalled = "NotInstalled"
+ Installed = "Installed"
+ Enabled = "Enabled"
+
+def get_exthandlers_handler():
+ return ExtHandlersHandler()
+
+class ExtHandlersHandler(object):
+ def __init__(self):
+ self.protocol_util = get_protocol_util()
+ self.ext_handlers = None
+ self.last_etag = None
+ self.log_report = False
+
+ def run(self):
+ self.ext_handlers, etag = None, None
+ try:
+ self.protocol = self.protocol_util.get_protocol()
+ self.ext_handlers, etag = self.protocol.get_ext_handlers()
+ except ProtocolError as e:
+ msg = u"Exception retrieving extension handlers: {0}".format(
+ ustr(e))
+ logger.warn(msg)
+ add_event(AGENT_NAME, version=CURRENT_VERSION, is_success=False, message=msg)
+ return
+
+ if self.last_etag is not None and self.last_etag == etag:
+ msg = u"Incarnation {0} has no extension updates".format(etag)
+ logger.verbose(msg)
+ self.log_report = False
+ else:
+ msg = u"Handle extensions updates for incarnation {0}".format(etag)
+ logger.info(msg)
+ self.log_report = True #Log status report success on new config
+ self.handle_ext_handlers()
+ self.last_etag = etag
+
+ self.report_ext_handlers_status()
+
+ def run_status(self):
+ self.report_ext_handlers_status()
+ return
+
+ def handle_ext_handlers(self):
+ if self.ext_handlers.extHandlers is None or \
+ len(self.ext_handlers.extHandlers) == 0:
+ logger.info("No ext handler config found")
+ return
+
+ for ext_handler in self.ext_handlers.extHandlers:
+ #TODO handle install in sequence, enable in parallel
+ self.handle_ext_handler(ext_handler)
+
+ def handle_ext_handler(self, ext_handler):
+ ext_handler_i = ExtHandlerInstance(ext_handler, self.protocol)
+ try:
+ state = ext_handler.properties.state
+ ext_handler_i.logger.info("Expected handler state: {0}", state)
+ if state == "enabled":
+ self.handle_enable(ext_handler_i)
+ elif state == u"disabled":
+ self.handle_disable(ext_handler_i)
+ elif state == u"uninstall":
+ self.handle_uninstall(ext_handler_i)
+ else:
+ message = u"Unknown ext handler state:{0}".format(state)
+ raise ExtensionError(message)
+ except ExtensionError as e:
+ ext_handler_i.set_handler_status(message=ustr(e), code=-1)
+ ext_handler_i.report_event(message=ustr(e), is_success=False)
+
+ def handle_enable(self, ext_handler_i):
+
+ ext_handler_i.decide_version()
+
+ old_ext_handler_i = ext_handler_i.get_installed_ext_handler()
+ if old_ext_handler_i is not None and \
+ old_ext_handler_i.version_gt(ext_handler_i):
+ raise ExtensionError(u"Downgrade not allowed")
+
+ handler_state = ext_handler_i.get_handler_state()
+ ext_handler_i.logger.info("Current handler state is: {0}", handler_state)
+ if handler_state == ExtHandlerState.NotInstalled:
+ ext_handler_i.set_handler_state(ExtHandlerState.NotInstalled)
+
+ ext_handler_i.download()
+
+ ext_handler_i.update_settings()
+
+ if old_ext_handler_i is None:
+ ext_handler_i.install()
+ elif ext_handler_i.version_gt(old_ext_handler_i):
+ old_ext_handler_i.disable()
+ ext_handler_i.copy_status_files(old_ext_handler_i)
+ ext_handler_i.update()
+ old_ext_handler_i.uninstall()
+ old_ext_handler_i.rm_ext_handler_dir()
+ ext_handler_i.update_with_install()
+ else:
+ ext_handler_i.update_settings()
+
+ ext_handler_i.enable()
+
+ def handle_disable(self, ext_handler_i):
+ handler_state = ext_handler_i.get_handler_state()
+ ext_handler_i.logger.info("Current handler state is: {0}", handler_state)
+ if handler_state == ExtHandlerState.Enabled:
+ ext_handler_i.disable()
+
+ def handle_uninstall(self, ext_handler_i):
+ handler_state = ext_handler_i.get_handler_state()
+ ext_handler_i.logger.info("Current handler state is: {0}", handler_state)
+ if handler_state != ExtHandlerState.NotInstalled:
+ if handler_state == ExtHandlerState.Enabled:
+ ext_handler_i.disable()
+ ext_handler_i.uninstall()
+ ext_handler_i.rm_ext_handler_dir()
+
+ def report_ext_handlers_status(self):
+ """Go thru handler_state dir, collect and report status"""
+ vm_status = VMStatus()
+ vm_status.vmAgent.version = str(CURRENT_VERSION)
+ vm_status.vmAgent.status = "Ready"
+ vm_status.vmAgent.message = "Guest Agent is running"
+
+ if self.ext_handlers is not None:
+ for ext_handler in self.ext_handlers.extHandlers:
+ try:
+ self.report_ext_handler_status(vm_status, ext_handler)
+ except ExtensionError as e:
+ add_event(
+ AGENT_NAME,
+ version=CURRENT_VERSION,
+ is_success=False,
+ message=ustr(e))
+
+ logger.verbose("Report vm agent status")
+
+ try:
+ self.protocol.report_vm_status(vm_status)
+ except ProtocolError as e:
+ message = "Failed to report vm agent status: {0}".format(e)
+ add_event(AGENT_NAME, version=CURRENT_VERSION, is_success=False, message=message)
+
+ if self.log_report:
+ logger.verbose("Successfully reported vm agent status")
+
+
+ def report_ext_handler_status(self, vm_status, ext_handler):
+ ext_handler_i = ExtHandlerInstance(ext_handler, self.protocol)
+
+ handler_status = ext_handler_i.get_handler_status()
+ if handler_status is None:
+ return
+
+ handler_state = ext_handler_i.get_handler_state()
+ if handler_state != ExtHandlerState.NotInstalled:
+ try:
+ active_exts = ext_handler_i.report_ext_status()
+ handler_status.extensions.extend(active_exts)
+ except ExtensionError as e:
+ ext_handler_i.set_handler_status(message=ustr(e), code=-1)
+
+ try:
+ heartbeat = ext_handler_i.collect_heartbeat()
+ if heartbeat is not None:
+ handler_status.status = heartbeat.get('status')
+ except ExtensionError as e:
+ ext_handler_i.set_handler_status(message=ustr(e), code=-1)
+
+ vm_status.vmAgent.extensionHandlers.append(handler_status)
+
+class ExtHandlerInstance(object):
+ def __init__(self, ext_handler, protocol):
+ self.ext_handler = ext_handler
+ self.protocol = protocol
+ self.operation = None
+ self.pkg = None
+
+ prefix = "[{0}]".format(self.get_full_name())
+ self.logger = logger.Logger(logger.DEFAULT_LOGGER, prefix)
+
+ try:
+ fileutil.mkdir(self.get_log_dir(), mode=0o744)
+ except IOError as e:
+ self.logger.error(u"Failed to create extension log dir: {0}", e)
+
+ log_file = os.path.join(self.get_log_dir(), "CommandExecution.log")
+ self.logger.add_appender(logger.AppenderType.FILE,
+ logger.LogLevel.INFO, log_file)
+
+ def decide_version(self):
+ self.logger.info("Decide which version to use")
+ try:
+ pkg_list = self.protocol.get_ext_handler_pkgs(self.ext_handler)
+ except ProtocolError as e:
+ raise ExtensionError("Failed to get ext handler pkgs", e)
+
+ # Determine the desired and installed versions
+ requested_version = FlexibleVersion(self.ext_handler.properties.version)
+ installed_version = FlexibleVersion(self.get_installed_version())
+ if installed_version is None:
+ installed_version = requested_version
+
+ # Divide packages
+ # - Find the installed package (its version must exactly match)
+ # - Find the internal candidate (its version must exactly match)
+ # - Separate the public packages
+ internal_pkg = None
+ installed_pkg = None
+ public_pkgs = []
+ for pkg in pkg_list.versions:
+ pkg_version = FlexibleVersion(pkg.version)
+ if pkg_version == installed_version:
+ installed_pkg = pkg
+ if pkg.isinternal and pkg_version == requested_version:
+ internal_pkg = pkg
+ if not pkg.isinternal:
+ public_pkgs.append(pkg)
+
+ internal_version = FlexibleVersion(internal_pkg.version) \
+ if internal_pkg is not None \
+ else FlexibleVersion()
+ public_pkgs.sort(key=lambda pkg: FlexibleVersion(pkg.version), reverse=True)
+
+ # Determine the preferred version and type of upgrade occurring
+ preferred_version = max(requested_version, installed_version)
+ is_major_upgrade = preferred_version.major > installed_version.major
+ allow_minor_upgrade = self.ext_handler.properties.upgradePolicy == 'auto'
+
+ # Find the first public candidate which
+ # - Matches the preferred major version
+ # - Does not upgrade to a new, disallowed major version
+ # - And only increments the minor version if allowed
+ # Notes:
+ # - The patch / hotfix version is not considered
+ public_pkg = None
+ for pkg in public_pkgs:
+ pkg_version = FlexibleVersion(pkg.version)
+ if pkg_version.major == preferred_version.major \
+ and (not pkg.disallow_major_upgrade or not is_major_upgrade) \
+ and (allow_minor_upgrade or pkg_version.minor == preferred_version.minor):
+ public_pkg = pkg
+ break
+
+ # If there are no candidates, locate the highest public version whose
+ # major matches that installed
+ if internal_pkg is None and public_pkg is None:
+ for pkg in public_pkgs:
+ pkg_version = FlexibleVersion(pkg.version)
+ if pkg_version.major == installed_version.major:
+ public_pkg = pkg
+ break
+
+ public_version = FlexibleVersion(public_pkg.version) \
+ if public_pkg is not None \
+ else FlexibleVersion()
+
+ # Select the candidate
+ # - Use the public candidate if there is no internal candidate or
+ # the public is more recent (e.g., a hotfix patch)
+ # - Otherwise use the internal candidate
+ if internal_pkg is None or (public_pkg is not None and public_version > internal_version):
+ selected_pkg = public_pkg
+ else:
+ selected_pkg = internal_pkg
+
+ selected_version = FlexibleVersion(selected_pkg.version) \
+ if selected_pkg is not None \
+ else FlexibleVersion()
+
+ # Finally, update the version only if not downgrading
+ # Note:
+ # - A downgrade, which will be bound to the same major version,
+ # is allowed if the installed version is no longer available
+ if selected_pkg is None \
+ or (installed_pkg is not None and selected_version < installed_version):
+ self.pkg = installed_pkg
+ self.ext_handler.properties.version = installed_version
+ else:
+ self.pkg = selected_pkg
+ self.ext_handler.properties.version = selected_pkg.version
+
+ if self.pkg is None:
+ raise ExtensionError("Failed to find any valid extension package")
+
+ self.logger.info("Use version: {0}", self.pkg.version)
+ return
+
+ def version_gt(self, other):
+ self_version = self.ext_handler.properties.version
+ other_version = other.ext_handler.properties.version
+ return Version(self_version) > Version(other_version)
+
+ def get_installed_ext_handler(self):
+ lastest_version = self.get_installed_version()
+ if lastest_version is None:
+ return None
+
+ installed_handler = ExtHandler()
+ set_properties("ExtHandler", installed_handler, get_properties(self.ext_handler))
+ installed_handler.properties.version = lastest_version
+ return ExtHandlerInstance(installed_handler, self.protocol)
+
+ def get_installed_version(self):
+ lastest_version = None
+
+ for path in glob.iglob(os.path.join(conf.get_lib_dir(), self.ext_handler.name + "-*")):
+ if not os.path.isdir(path):
+ continue
+
+ separator = path.rfind('-')
+ version = FlexibleVersion(path[separator+1:])
+
+ if lastest_version is None or lastest_version < version:
+ lastest_version = version
+
+ return str(lastest_version) if lastest_version is not None else None
+
+ def copy_status_files(self, old_ext_handler_i):
+ self.logger.info("Copy status files from old plugin to new")
+ old_ext_dir = old_ext_handler_i.get_base_dir()
+ new_ext_dir = self.get_base_dir()
+
+ old_ext_mrseq_file = os.path.join(old_ext_dir, "mrseq")
+ if os.path.isfile(old_ext_mrseq_file):
+ shutil.copy2(old_ext_mrseq_file, new_ext_dir)
+
+ old_ext_status_dir = old_ext_handler_i.get_status_dir()
+ new_ext_status_dir = self.get_status_dir()
+
+ if os.path.isdir(old_ext_status_dir):
+ for status_file in os.listdir(old_ext_status_dir):
+ status_file = os.path.join(old_ext_status_dir, status_file)
+ if os.path.isfile(status_file):
+ shutil.copy2(status_file, new_ext_status_dir)
+
+ def set_operation(self, op):
+ self.operation = op
+
+ def report_event(self, message="", is_success=True):
+ version = self.ext_handler.properties.version
+ add_event(name=self.ext_handler.name, version=version, message=message,
+ op=self.operation, is_success=is_success)
+
+ def download(self):
+ self.logger.info("Download extension package")
+ self.set_operation(WALAEventOperation.Download)
+ if self.pkg is None:
+ raise ExtensionError("No package uri found")
+
+ package = None
+ for uri in self.pkg.uris:
+ try:
+ package = self.protocol.download_ext_handler_pkg(uri.uri)
+ except ProtocolError as e:
+ logger.warn("Failed download extension: {0}", e)
+
+ if package is None:
+ raise ExtensionError("Failed to download extension")
+
+ self.logger.info("Unpack extension package")
+ pkg_file = os.path.join(conf.get_lib_dir(),
+ os.path.basename(uri.uri) + ".zip")
+ try:
+ fileutil.write_file(pkg_file, bytearray(package), asbin=True)
+ zipfile.ZipFile(pkg_file).extractall(self.get_base_dir())
+ except IOError as e:
+ raise ExtensionError(u"Failed to write and unzip plugin", e)
+
+ chmod = "find {0} -type f | xargs chmod u+x".format(self.get_base_dir())
+ shellutil.run(chmod)
+ self.report_event(message="Download succeeded")
+
+ self.logger.info("Initialize extension directory")
+ #Save HandlerManifest.json
+ man_file = fileutil.search_file(self.get_base_dir(),
+ 'HandlerManifest.json')
+
+ if man_file is None:
+ raise ExtensionError("HandlerManifest.json not found")
+
+ try:
+ man = fileutil.read_file(man_file, remove_bom=True)
+ fileutil.write_file(self.get_manifest_file(), man)
+ except IOError as e:
+ raise ExtensionError(u"Failed to save HandlerManifest.json", e)
+
+ #Create status and config dir
+ try:
+ status_dir = self.get_status_dir()
+ fileutil.mkdir(status_dir, mode=0o700)
+ conf_dir = self.get_conf_dir()
+ fileutil.mkdir(conf_dir, mode=0o700)
+ except IOError as e:
+ raise ExtensionError(u"Failed to create status or config dir", e)
+
+ #Save HandlerEnvironment.json
+ self.create_handler_env()
+
+ def enable(self):
+ self.logger.info("Enable extension.")
+ self.set_operation(WALAEventOperation.Enable)
+
+ man = self.load_manifest()
+ self.launch_command(man.get_enable_command(), timeout=300)
+ self.set_handler_state(ExtHandlerState.Enabled)
+ self.set_handler_status(status="Ready", message="Plugin enabled")
+
+ def disable(self):
+ self.logger.info("Disable extension.")
+ self.set_operation(WALAEventOperation.Disable)
+
+ man = self.load_manifest()
+ self.launch_command(man.get_disable_command(), timeout=900)
+ self.set_handler_state(ExtHandlerState.Installed)
+ self.set_handler_status(status="NotReady", message="Plugin disabled")
+
+ def install(self):
+ self.logger.info("Install extension.")
+ self.set_operation(WALAEventOperation.Install)
+
+ man = self.load_manifest()
+ self.launch_command(man.get_install_command(), timeout=900)
+ self.set_handler_state(ExtHandlerState.Installed)
+
+ def uninstall(self):
+ self.logger.info("Uninstall extension.")
+ self.set_operation(WALAEventOperation.UnInstall)
+
+ try:
+ man = self.load_manifest()
+ self.launch_command(man.get_uninstall_command())
+ except ExtensionError as e:
+ self.report_event(message=ustr(e), is_success=False)
+
+ def rm_ext_handler_dir(self):
+ try:
+ handler_state_dir = self.get_handler_state_dir()
+ if os.path.isdir(handler_state_dir):
+ self.logger.info("Remove ext handler dir: {0}", handler_state_dir)
+ shutil.rmtree(handler_state_dir)
+ base_dir = self.get_base_dir()
+ if os.path.isdir(base_dir):
+ self.logger.info("Remove ext handler dir: {0}", base_dir)
+ shutil.rmtree(base_dir)
+ except IOError as e:
+ message = "Failed to rm ext handler dir: {0}".format(e)
+ self.report_event(message=message, is_success=False)
+
+ def update(self):
+ self.logger.info("Update extension.")
+ self.set_operation(WALAEventOperation.Update)
+
+ man = self.load_manifest()
+ self.launch_command(man.get_update_command(), timeout=900)
+
+ def update_with_install(self):
+ man = self.load_manifest()
+ if man.is_update_with_install():
+ self.install()
+ else:
+ self.logger.info("UpdateWithInstall not set. "
+ "Skip install during upgrade.")
+ self.set_handler_state(ExtHandlerState.Installed)
+
+ def get_largest_seq_no(self):
+ seq_no = -1
+ conf_dir = self.get_conf_dir()
+ for item in os.listdir(conf_dir):
+ item_path = os.path.join(conf_dir, item)
+ if os.path.isfile(item_path):
+ try:
+ seperator = item.rfind(".")
+ if seperator > 0 and item[seperator + 1:] == 'settings':
+ curr_seq_no = int(item.split('.')[0])
+ if curr_seq_no > seq_no:
+ seq_no = curr_seq_no
+ except Exception as e:
+ self.logger.verbose("Failed to parse file name: {0}", item)
+ continue
+ return seq_no
+
+ def collect_ext_status(self, ext):
+ self.logger.verbose("Collect extension status")
+
+ seq_no = self.get_largest_seq_no()
+ if seq_no == -1:
+ return None
+
+ status_dir = self.get_status_dir()
+ ext_status_file = "{0}.status".format(seq_no)
+ ext_status_file = os.path.join(status_dir, ext_status_file)
+
+ ext_status = ExtensionStatus(seq_no=seq_no)
+ try:
+ data_str = fileutil.read_file(ext_status_file)
+ data = json.loads(data_str)
+ parse_ext_status(ext_status, data)
+ except IOError as e:
+ ext_status.message = u"Failed to get status file {0}".format(e)
+ ext_status.code = -1
+ ext_status.status = "error"
+ except ValueError as e:
+ ext_status.message = u"Malformed status file {0}".format(e)
+ ext_status.code = -1
+ ext_status.status = "error"
+
+ return ext_status
+
+ def report_ext_status(self):
+ active_exts = []
+ for ext in self.ext_handler.properties.extensions:
+ ext_status = self.collect_ext_status(ext)
+ if ext_status is None:
+ continue
+ try:
+ self.protocol.report_ext_status(self.ext_handler.name, ext.name,
+ ext_status)
+ active_exts.append(ext.name)
+ except ProtocolError as e:
+ self.logger.error(u"Failed to report extension status: {0}", e)
+ return active_exts
+
+ def collect_heartbeat(self):
+ man = self.load_manifest()
+ if not man.is_report_heartbeat():
+ return
+ heartbeat_file = os.path.join(conf.get_lib_dir(),
+ self.get_heartbeat_file())
+
+ self.logger.info("Collect heart beat")
+ if not os.path.isfile(heartbeat_file):
+ raise ExtensionError("Failed to get heart beat file")
+ if not self.is_responsive(heartbeat_file):
+ return {
+ "status": "Unresponsive",
+ "code": -1,
+ "message": "Extension heartbeat is not responsive"
+ }
+ try:
+ heartbeat_json = fileutil.read_file(heartbeat_file)
+ heartbeat = json.loads(heartbeat_json)[0]['heartbeat']
+ except IOError as e:
+ raise ExtensionError("Failed to get heartbeat file:{0}".format(e))
+ except ValueError as e:
+ raise ExtensionError("Malformed heartbeat file: {0}".format(e))
+ return heartbeat
+
+ def is_responsive(self, heartbeat_file):
+ last_update=int(time.time() - os.stat(heartbeat_file).st_mtime)
+ return last_update > 600 # not updated for more than 10 min
+
+ def launch_command(self, cmd, timeout=300):
+ self.logger.info("Launch command:{0}", cmd)
+ base_dir = self.get_base_dir()
+ try:
+ devnull = open(os.devnull, 'w')
+ child = subprocess.Popen(base_dir + "/" + cmd,
+ shell=True,
+ cwd=base_dir,
+ stdout=devnull)
+ except Exception as e:
+ #TODO do not catch all exception
+ raise ExtensionError("Failed to launch: {0}, {1}".format(cmd, e))
+
+ retry = timeout
+ while retry > 0 and child.poll() is None:
+ time.sleep(1)
+ retry -= 1
+ if retry == 0:
+ os.kill(child.pid, 9)
+ raise ExtensionError("Timeout({0}): {1}".format(timeout, cmd))
+
+ ret = child.wait()
+ if ret == None or ret != 0:
+ raise ExtensionError("Non-zero exit code: {0}, {1}".format(ret, cmd))
+
+ self.report_event(message="Launch command succeeded: {0}".format(cmd))
+
+ def load_manifest(self):
+ man_file = self.get_manifest_file()
+ try:
+ data = json.loads(fileutil.read_file(man_file))
+ except IOError as e:
+ raise ExtensionError('Failed to load manifest file.')
+ except ValueError as e:
+ raise ExtensionError('Malformed manifest file.')
+
+ return HandlerManifest(data[0])
+
+ def update_settings_file(self, settings_file, settings):
+ settings_file = os.path.join(self.get_conf_dir(), settings_file)
+ try:
+ fileutil.write_file(settings_file, settings)
+ except IOError as e:
+ raise ExtensionError(u"Failed to update settings file", e)
+
+ def update_settings(self):
+ if self.ext_handler.properties.extensions is None or \
+ len(self.ext_handler.properties.extensions) == 0:
+ #This is the behavior of waagent 2.0.x
+ #The new agent has to be consistent with the old one.
+ self.logger.info("Extension has no settings, write empty 0.settings")
+ self.update_settings_file("0.settings", "")
+ return
+
+ for ext in self.ext_handler.properties.extensions:
+ settings = {
+ 'publicSettings': ext.publicSettings,
+ 'protectedSettings': ext.protectedSettings,
+ 'protectedSettingsCertThumbprint': ext.certificateThumbprint
+ }
+ ext_settings = {
+ "runtimeSettings":[{
+ "handlerSettings": settings
+ }]
+ }
+ settings_file = "{0}.settings".format(ext.sequenceNumber)
+ self.logger.info("Update settings file: {0}", settings_file)
+ self.update_settings_file(settings_file, json.dumps(ext_settings))
+
+ def create_handler_env(self):
+ env = [{
+ "name": self.ext_handler.name,
+ "version" : HANDLER_ENVIRONMENT_VERSION,
+ "handlerEnvironment" : {
+ "logFolder" : self.get_log_dir(),
+ "configFolder" : self.get_conf_dir(),
+ "statusFolder" : self.get_status_dir(),
+ "heartbeatFile" : self.get_heartbeat_file()
+ }
+ }]
+ try:
+ fileutil.write_file(self.get_env_file(), json.dumps(env))
+ except IOError as e:
+ raise ExtensionError(u"Failed to save handler environment", e)
+
+ def get_handler_state_dir(self):
+ return os.path.join(conf.get_lib_dir(), "handler_state",
+ self.get_full_name())
+
+ def set_handler_state(self, handler_state):
+ state_dir = self.get_handler_state_dir()
+ if not os.path.exists(state_dir):
+ try:
+ fileutil.mkdir(state_dir, 0o700)
+ except IOError as e:
+ self.logger.error("Failed to create state dir: {0}", e)
+
+ try:
+ state_file = os.path.join(state_dir, "state")
+ fileutil.write_file(state_file, handler_state)
+ except IOError as e:
+ self.logger.error("Failed to set state: {0}", e)
+
+ def get_handler_state(self):
+ state_dir = self.get_handler_state_dir()
+ state_file = os.path.join(state_dir, "state")
+ if not os.path.isfile(state_file):
+ return ExtHandlerState.NotInstalled
+
+ try:
+ return fileutil.read_file(state_file)
+ except IOError as e:
+ self.logger.error("Failed to get state: {0}", e)
+ return ExtHandlerState.NotInstalled
+
+ def set_handler_status(self, status="NotReady", message="",
+ code=0):
+ state_dir = self.get_handler_state_dir()
+ if not os.path.exists(state_dir):
+ try:
+ fileutil.mkdir(state_dir, 0o700)
+ except IOError as e:
+ self.logger.error("Failed to create state dir: {0}", e)
+
+ handler_status = ExtHandlerStatus()
+ handler_status.name = self.ext_handler.name
+ handler_status.version = self.ext_handler.properties.version
+ handler_status.message = message
+ handler_status.code = code
+ handler_status.status = status
+ status_file = os.path.join(state_dir, "status")
+
+ try:
+ fileutil.write_file(status_file,
+ json.dumps(get_properties(handler_status)))
+ except (IOError, ValueError, ProtocolError) as e:
+ self.logger.error("Failed to save handler status: {0}", e)
+
+ def get_handler_status(self):
+ state_dir = self.get_handler_state_dir()
+ status_file = os.path.join(state_dir, "status")
+ if not os.path.isfile(status_file):
+ return None
+
+ try:
+ data = json.loads(fileutil.read_file(status_file))
+ handler_status = ExtHandlerStatus()
+ set_properties("ExtHandlerStatus", handler_status, data)
+ return handler_status
+ except (IOError, ValueError) as e:
+ self.logger.error("Failed to get handler status: {0}", e)
+
+ def get_full_name(self):
+ return "{0}-{1}".format(self.ext_handler.name,
+ self.ext_handler.properties.version)
+
+ def get_base_dir(self):
+ return os.path.join(conf.get_lib_dir(), self.get_full_name())
+
+ def get_status_dir(self):
+ return os.path.join(self.get_base_dir(), "status")
+
+ def get_conf_dir(self):
+ return os.path.join(self.get_base_dir(), 'config')
+
+ def get_heartbeat_file(self):
+ return os.path.join(self.get_base_dir(), 'heartbeat.log')
+
+ def get_manifest_file(self):
+ return os.path.join(self.get_base_dir(), 'HandlerManifest.json')
+
+ def get_env_file(self):
+ return os.path.join(self.get_base_dir(), 'HandlerEnvironment.json')
+
+ def get_log_dir(self):
+ return os.path.join(conf.get_ext_log_dir(), self.ext_handler.name,
+ self.ext_handler.properties.version)
+
+class HandlerEnvironment(object):
+ def __init__(self, data):
+ self.data = data
+
+ def get_version(self):
+ return self.data["version"]
+
+ def get_log_dir(self):
+ return self.data["handlerEnvironment"]["logFolder"]
+
+ def get_conf_dir(self):
+ return self.data["handlerEnvironment"]["configFolder"]
+
+ def get_status_dir(self):
+ return self.data["handlerEnvironment"]["statusFolder"]
+
+ def get_heartbeat_file(self):
+ return self.data["handlerEnvironment"]["heartbeatFile"]
+
+class HandlerManifest(object):
+ def __init__(self, data):
+ if data is None or data['handlerManifest'] is None:
+ raise ExtensionError('Malformed manifest file.')
+ self.data = data
+
+ def get_name(self):
+ return self.data["name"]
+
+ def get_version(self):
+ return self.data["version"]
+
+ def get_install_command(self):
+ return self.data['handlerManifest']["installCommand"]
+
+ def get_uninstall_command(self):
+ return self.data['handlerManifest']["uninstallCommand"]
+
+ def get_update_command(self):
+ return self.data['handlerManifest']["updateCommand"]
+
+ def get_enable_command(self):
+ return self.data['handlerManifest']["enableCommand"]
+
+ def get_disable_command(self):
+ return self.data['handlerManifest']["disableCommand"]
+
+ def is_reboot_after_install(self):
+ """
+ Deprecated
+ """
+ return False
+
+ def is_report_heartbeat(self):
+ return self.data['handlerManifest'].get('reportHeartbeat', False)
+
+ def is_update_with_install(self):
+ update_mode = self.data['handlerManifest'].get('updateMode')
+ if update_mode is None:
+ return True
+ return update_mode.low() == "updatewithinstall"
diff --git a/azurelinuxagent/ga/monitor.py b/azurelinuxagent/ga/monitor.py
new file mode 100644
index 0000000..f49cef8
--- /dev/null
+++ b/azurelinuxagent/ga/monitor.py
@@ -0,0 +1,192 @@
+# Copyright 2014 Microsoft Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Requires Python 2.4+ and Openssl 1.0+
+#
+
+import datetime
+import json
+import os
+import platform
+import time
+import threading
+
+import azurelinuxagent.common.conf as conf
+import azurelinuxagent.common.logger as logger
+
+from azurelinuxagent.common.event import WALAEventOperation, add_event
+from azurelinuxagent.common.exception import EventError, ProtocolError, OSUtilError
+from azurelinuxagent.common.future import ustr
+from azurelinuxagent.common.osutil import get_osutil
+from azurelinuxagent.common.protocol import get_protocol_util
+from azurelinuxagent.common.protocol.restapi import TelemetryEventParam, \
+ TelemetryEventList, \
+ TelemetryEvent, \
+ set_properties
+from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, getattrib
+from azurelinuxagent.common.version import DISTRO_NAME, DISTRO_VERSION, \
+ DISTRO_CODE_NAME, AGENT_LONG_VERSION, \
+ CURRENT_AGENT, CURRENT_VERSION
+
+
+def parse_event(data_str):
+ try:
+ return parse_json_event(data_str)
+ except ValueError:
+ return parse_xml_event(data_str)
+
+
+def parse_xml_param(param_node):
+ name = getattrib(param_node, "Name")
+ value_str = getattrib(param_node, "Value")
+ attr_type = getattrib(param_node, "T")
+ value = value_str
+ if attr_type == 'mt:uint64':
+ value = int(value_str)
+ elif attr_type == 'mt:bool':
+ value = bool(value_str)
+ elif attr_type == 'mt:float64':
+ value = float(value_str)
+ return TelemetryEventParam(name, value)
+
+
+def parse_xml_event(data_str):
+ try:
+ xml_doc = parse_doc(data_str)
+ event_id = getattrib(find(xml_doc, "Event"), 'id')
+ provider_id = getattrib(find(xml_doc, "Provider"), 'id')
+ event = TelemetryEvent(event_id, provider_id)
+ param_nodes = findall(xml_doc, 'Param')
+ for param_node in param_nodes:
+ event.parameters.append(parse_xml_param(param_node))
+ return event
+ except Exception as e:
+ raise ValueError(ustr(e))
+
+
+def parse_json_event(data_str):
+ data = json.loads(data_str)
+ event = TelemetryEvent()
+ set_properties("TelemetryEvent", event, data)
+ return event
+
+
+def get_monitor_handler():
+ return MonitorHandler()
+
+
+class MonitorHandler(object):
+ def __init__(self):
+ self.osutil = get_osutil()
+ self.protocol_util = get_protocol_util()
+ self.sysinfo = []
+
+ def run(self):
+ self.init_sysinfo()
+
+ event_thread = threading.Thread(target=self.daemon)
+ event_thread.setDaemon(True)
+ event_thread.start()
+
+ def init_sysinfo(self):
+ osversion = "{0}:{1}-{2}-{3}:{4}".format(platform.system(),
+ DISTRO_NAME,
+ DISTRO_VERSION,
+ DISTRO_CODE_NAME,
+ platform.release())
+ self.sysinfo.append(TelemetryEventParam("OSVersion", osversion))
+ self.sysinfo.append(
+ TelemetryEventParam("GAVersion", CURRENT_AGENT))
+
+ try:
+ ram = self.osutil.get_total_mem()
+ processors = self.osutil.get_processor_cores()
+ self.sysinfo.append(TelemetryEventParam("RAM", ram))
+ self.sysinfo.append(TelemetryEventParam("Processors", processors))
+ except OSUtilError as e:
+ logger.warn("Failed to get system info: {0}", e)
+
+ try:
+ protocol = self.protocol_util.get_protocol()
+ vminfo = protocol.get_vminfo()
+ self.sysinfo.append(TelemetryEventParam("VMName",
+ vminfo.vmName))
+ self.sysinfo.append(TelemetryEventParam("TenantName",
+ vminfo.tenantName))
+ self.sysinfo.append(TelemetryEventParam("RoleName",
+ vminfo.roleName))
+ self.sysinfo.append(TelemetryEventParam("RoleInstanceName",
+ vminfo.roleInstanceName))
+ self.sysinfo.append(TelemetryEventParam("ContainerId",
+ vminfo.containerId))
+ except ProtocolError as e:
+ logger.warn("Failed to get system info: {0}", e)
+
+ def collect_event(self, evt_file_name):
+ try:
+ logger.verbose("Found event file: {0}", evt_file_name)
+ with open(evt_file_name, "rb") as evt_file:
+ # if fail to open or delete the file, throw exception
+ data_str = evt_file.read().decode("utf-8", 'ignore')
+ logger.verbose("Processed event file: {0}", evt_file_name)
+ os.remove(evt_file_name)
+ return data_str
+ except IOError as e:
+ msg = "Failed to process {0}, {1}".format(evt_file_name, e)
+ raise EventError(msg)
+
+ def collect_and_send_events(self):
+ event_list = TelemetryEventList()
+ event_dir = os.path.join(conf.get_lib_dir(), "events")
+ event_files = os.listdir(event_dir)
+ for event_file in event_files:
+ if not event_file.endswith(".tld"):
+ continue
+ event_file_path = os.path.join(event_dir, event_file)
+ try:
+ data_str = self.collect_event(event_file_path)
+ except EventError as e:
+ logger.error("{0}", e)
+ continue
+
+ try:
+ event = parse_event(data_str)
+ event.parameters.extend(self.sysinfo)
+ event_list.events.append(event)
+ except (ValueError, ProtocolError) as e:
+ logger.warn("Failed to decode event file: {0}", e)
+ continue
+
+ if len(event_list.events) == 0:
+ return
+
+ try:
+ protocol = self.protocol_util.get_protocol()
+ protocol.report_event(event_list)
+ except ProtocolError as e:
+ logger.error("{0}", e)
+
+ def daemon(self):
+ last_heartbeat = datetime.datetime.min
+ period = datetime.timedelta(minutes=30)
+ while True:
+ if (datetime.datetime.now() - last_heartbeat) > period:
+ last_heartbeat = datetime.datetime.now()
+ add_event(op=WALAEventOperation.HeartBeat, name="WALA",
+ is_success=True)
+ try:
+ self.collect_and_send_events()
+ except Exception as e:
+ logger.warn("Failed to send events: {0}", e)
+ time.sleep(60)
diff --git a/azurelinuxagent/ga/update.py b/azurelinuxagent/ga/update.py
new file mode 100644
index 0000000..e89608a
--- /dev/null
+++ b/azurelinuxagent/ga/update.py
@@ -0,0 +1,715 @@
+# Windows Azure Linux Agent
+#
+# Copyright 2014 Microsoft Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Requires Python 2.4+ and Openssl 1.0+
+#
+import glob
+import json
+import os
+import platform
+import re
+import shlex
+import shutil
+import signal
+import subprocess
+import sys
+import time
+import zipfile
+
+import azurelinuxagent.common.conf as conf
+import azurelinuxagent.common.logger as logger
+import azurelinuxagent.common.utils.fileutil as fileutil
+import azurelinuxagent.common.utils.restutil as restutil
+import azurelinuxagent.common.utils.textutil as textutil
+
+from azurelinuxagent.common.event import add_event, WALAEventOperation
+from azurelinuxagent.common.exception import UpdateError, ProtocolError
+from azurelinuxagent.common.future import ustr
+from azurelinuxagent.common.osutil import get_osutil
+from azurelinuxagent.common.protocol import get_protocol_util
+from azurelinuxagent.common.utils.flexible_version import FlexibleVersion
+from azurelinuxagent.common.version import AGENT_NAME, AGENT_VERSION, AGENT_LONG_VERSION, \
+ AGENT_DIR_GLOB, AGENT_PKG_GLOB, \
+ AGENT_PATTERN, AGENT_NAME_PATTERN, AGENT_DIR_PATTERN, \
+ CURRENT_AGENT, CURRENT_VERSION, \
+ is_current_agent_installed
+
+from azurelinuxagent.ga.exthandlers import HandlerManifest
+
+
+AGENT_ERROR_FILE = "error.json" # File name for agent error record
+AGENT_MANIFEST_FILE = "HandlerManifest.json"
+
+CHILD_HEALTH_INTERVAL = 15 * 60
+CHILD_LAUNCH_INTERVAL = 5 * 60
+CHILD_LAUNCH_RESTART_MAX = 3
+CHILD_POLL_INTERVAL = 60
+
+MAX_FAILURE = 3 # Max failure allowed for agent before blacklisted
+
+GOAL_STATE_INTERVAL = 25
+REPORT_STATUS_INTERVAL = 15
+RETAIN_INTERVAL = 24 * 60 * 60 # Retain interval for black list
+
+
+def get_update_handler():
+ return UpdateHandler()
+
+
+def get_python_cmd():
+ major_version = platform.python_version_tuple()[0]
+ return "python" if int(major_version) <= 2 else "python{0}".format(major_version)
+
+
+class UpdateHandler(object):
+
+ def __init__(self):
+ self.osutil = get_osutil()
+ self.protocol_util = get_protocol_util()
+
+ self.running = True
+ self.last_etag = None
+ self.last_attempt_time = None
+
+ self.agents = []
+
+ self.child_agent = None
+ self.child_launch_time = None
+ self.child_launch_attempts = 0
+ self.child_process = None
+
+ self.signal_handler = None
+ return
+
+ def run_latest(self):
+ """
+ This method is called from the daemon to find and launch the most
+ current, downloaded agent.
+
+ Note:
+ - Most events should be tagged to the launched agent (agent_version)
+ """
+
+ if self.child_process is not None:
+ raise Exception("Illegal attempt to launch multiple goal state Agent processes")
+
+ if self.signal_handler is None:
+ self.signal_handler = signal.signal(signal.SIGTERM, self.forward_signal)
+
+ latest_agent = self.get_latest_agent()
+ if latest_agent is None:
+ logger.info(u"Installed Agent {0} is the most current agent", CURRENT_AGENT)
+ agent_cmd = "python -u {0} -run-exthandlers".format(sys.argv[0])
+ agent_dir = os.getcwd()
+ agent_name = CURRENT_AGENT
+ agent_version = CURRENT_VERSION
+ else:
+ logger.info(u"Determined Agent {0} to be the latest agent", latest_agent.name)
+ agent_cmd = latest_agent.get_agent_cmd()
+ agent_dir = latest_agent.get_agent_dir()
+ agent_name = latest_agent.name
+ agent_version = latest_agent.version
+
+ try:
+
+ # Launch the correct Python version for python-based agents
+ cmds = shlex.split(agent_cmd)
+ if cmds[0].lower() == "python":
+ cmds[0] = get_python_cmd()
+ agent_cmd = " ".join(cmds)
+
+ self._evaluate_agent_health(latest_agent)
+
+ self.child_process = subprocess.Popen(
+ cmds,
+ cwd=agent_dir,
+ stdout=sys.stdout,
+ stderr=sys.stderr)
+
+ logger.info(u"Agent {0} launched with command '{1}'", agent_name, agent_cmd)
+
+ ret = None
+ start_time = time.time()
+ while (time.time() - start_time) < CHILD_HEALTH_INTERVAL:
+ time.sleep(CHILD_POLL_INTERVAL)
+ ret = self.child_process.poll()
+ if ret is not None:
+ break
+
+ if ret is None or ret <= 0:
+ msg = u"Agent {0} launched with command '{1}' is successfully running".format(
+ agent_name,
+ agent_cmd)
+ logger.info(msg)
+ add_event(
+ AGENT_NAME,
+ version=agent_version,
+ op=WALAEventOperation.Enable,
+ is_success=True,
+ message=msg)
+
+ if ret is None:
+ ret = self.child_process.wait()
+
+ else:
+ msg = u"Agent {0} launched with command '{1}' failed with return code: {2}".format(
+ agent_name,
+ agent_cmd,
+ ret)
+ logger.warn(msg)
+ add_event(
+ AGENT_NAME,
+ version=agent_version,
+ op=WALAEventOperation.Enable,
+ is_success=False,
+ message=msg)
+
+ if ret is not None and ret > 0:
+ msg = u"Agent {0} launched with command '{1}' returned code: {2}".format(
+ agent_name,
+ agent_cmd,
+ ret)
+ logger.warn(msg)
+ if latest_agent is not None:
+ latest_agent.mark_failure()
+
+ except Exception as e:
+ msg = u"Agent {0} launched with command '{1}' failed with exception: {2}".format(
+ agent_name,
+ agent_cmd,
+ ustr(e))
+ logger.warn(msg)
+ add_event(
+ AGENT_NAME,
+ version=agent_version,
+ op=WALAEventOperation.Enable,
+ is_success=False,
+ message=msg)
+ if latest_agent is not None:
+ latest_agent.mark_failure(is_fatal=True)
+
+ self.child_process = None
+ return
+
+ def run(self):
+ """
+ This is the main loop which watches for agent and extension updates.
+ """
+
+ logger.info(u"Agent {0} is running as the goal state agent", CURRENT_AGENT)
+
+ # Launch monitoring threads
+ from azurelinuxagent.ga.monitor import get_monitor_handler
+ get_monitor_handler().run()
+
+ from azurelinuxagent.ga.env import get_env_handler
+ get_env_handler().run()
+
+ from azurelinuxagent.ga.exthandlers import get_exthandlers_handler
+ exthandlers_handler = get_exthandlers_handler()
+
+ # TODO: Add means to stop running
+ try:
+ while self.running:
+ if self._ensure_latest_agent():
+ if len(self.agents) > 0:
+ logger.info(
+ u"Agent {0} discovered {1} as an update and will exit",
+ CURRENT_AGENT,
+ self.agents[0].name)
+ break
+
+ exthandlers_handler.run()
+
+ time.sleep(25)
+
+ except Exception as e:
+ logger.warn(u"Agent {0} failed with exception: {1}", CURRENT_AGENT, ustr(e))
+ sys.exit(1)
+
+ sys.exit(0)
+ return
+
+ def forward_signal(self, signum, frame):
+ if self.child_process is None:
+ return
+
+ logger.info(
+ u"Agent {0} forwarding signal {1} to {2}",
+ CURRENT_AGENT,
+ signum,
+ self.child_agent.name if self.child_agent is not None else CURRENT_AGENT)
+ self.child_process.send_signal(signum)
+
+ if self.signal_handler not in (None, signal.SIG_IGN, signal.SIG_DFL):
+ self.signal_handler(signum, frame)
+ elif self.signal_handler is signal.SIG_DFL:
+ if signum == signal.SIGTERM:
+ sys.exit(0)
+ return
+
+ def get_latest_agent(self):
+ """
+ If autoupdate is enabled, return the most current, downloaded,
+ non-blacklisted agent (if any).
+ Otherwise, return None (implying to use the installed agent).
+ """
+
+ if not conf.get_autoupdate_enabled():
+ return None
+
+ self._load_agents()
+ available_agents = [agent for agent in self.agents if agent.is_available]
+ return available_agents[0] if len(available_agents) >= 1 else None
+
+ def _ensure_latest_agent(self, base_version=CURRENT_VERSION):
+ # Ignore new agents if updating is disabled
+ if not conf.get_autoupdate_enabled():
+ return False
+
+ now = time.time()
+ if self.last_attempt_time is not None:
+ next_attempt_time = self.last_attempt_time + conf.get_autoupdate_frequency()
+ else:
+ next_attempt_time = now
+ if next_attempt_time > now:
+ return False
+
+ family = conf.get_autoupdate_gafamily()
+ logger.info("Checking for agent family {0} updates", family)
+
+ self.last_attempt_time = now
+ try:
+ protocol = self.protocol_util.get_protocol()
+ manifest_list, etag = protocol.get_vmagent_manifests()
+ except Exception as e:
+ msg = u"Exception retrieving agent manifests: {0}".format(ustr(e))
+ logger.warn(msg)
+ add_event(
+ AGENT_NAME,
+ op=WALAEventOperation.Download,
+ version=CURRENT_VERSION,
+ is_success=False,
+ message=msg)
+ return False
+
+ if self.last_etag is not None and self.last_etag == etag:
+ logger.info(u"Incarnation {0} has no agent updates", etag)
+ return False
+
+ manifests = [m for m in manifest_list.vmAgentManifests if m.family == family]
+ if len(manifests) == 0:
+ logger.info(u"Incarnation {0} has no agent family {1} updates", etag, family)
+ return False
+
+ try:
+ pkg_list = protocol.get_vmagent_pkgs(manifests[0])
+ except ProtocolError as e:
+ msg= u"Incarnation {0} failed to get {1} package list: {2}".format(
+ etag,
+ family,
+ ustr(e))
+ logger.warn(msg)
+ add_event(
+ AGENT_NAME,
+ op=WALAEventOperation.Download,
+ version=CURRENT_VERSION,
+ is_success=False,
+ message=msg)
+ return False
+
+ # Set the agents to those available for download at least as current as the existing agent
+ # and remove from disk any agent no longer reported to the VM.
+ # Note:
+ # The code leaves on disk available, but blacklisted, agents so as to preserve the state.
+ # Otherwise, those agents could be again downloaded and inappropriately retried.
+ self._set_agents([GuestAgent(pkg=pkg) for pkg in pkg_list.versions])
+ self._purge_agents()
+ self._filter_blacklisted_agents()
+
+ # Return True if agents more recent than the current are available
+ return len(self.agents) > 0 and self.agents[0].version > base_version
+
+ def _evaluate_agent_health(self, latest_agent):
+ """
+ Evaluate the health of the selected agent: If it is restarting
+ too frequently, raise an Exception to force blacklisting.
+ """
+ if latest_agent is None:
+ self.child_agent = None
+ return
+
+ if self.child_agent is None or latest_agent.version != self.child_agent.version:
+ self.child_agent = latest_agent
+ self.child_launch_time = None
+ self.child_launch_attempts = 0
+
+ if self.child_launch_time is None:
+ self.child_launch_time = time.time()
+
+ self.child_launch_attempts += 1
+
+ if (time.time() - self.child_launch_time) <= CHILD_LAUNCH_INTERVAL \
+ and self.child_launch_attempts >= CHILD_LAUNCH_RESTART_MAX:
+ msg = u"Agent {0} restarted more than {1} times in {2} seconds".format(
+ self.child_agent.name,
+ CHILD_LAUNCH_RESTART_MAX,
+ CHILD_LAUNCH_INTERVAL)
+ raise Exception(msg)
+ return
+
+ def _filter_blacklisted_agents(self):
+ self.agents = [agent for agent in self.agents if not agent.is_blacklisted]
+ return
+
+ def _load_agents(self):
+ """
+ Load all non-blacklisted agents currently on disk.
+ """
+ if len(self.agents) <= 0:
+ try:
+ path = os.path.join(conf.get_lib_dir(), "{0}-*".format(AGENT_NAME))
+ self._set_agents([GuestAgent(path=agent_dir)
+ for agent_dir in glob.iglob(path) if os.path.isdir(agent_dir)])
+ self._filter_blacklisted_agents()
+ except Exception as e:
+ logger.warn(u"Exception occurred loading available agents: {0}", ustr(e))
+ return
+
+ def _purge_agents(self):
+ """
+ Remove from disk all directories and .zip files of unknown agents
+ (without removing the current, running agent).
+ """
+ path = os.path.join(conf.get_lib_dir(), "{0}-*".format(AGENT_NAME))
+
+ known_versions = [agent.version for agent in self.agents]
+ if not is_current_agent_installed() and CURRENT_VERSION not in known_versions:
+ logger.warn(
+ u"Running Agent {0} was not found in the agent manifest - adding to list",
+ CURRENT_VERSION)
+ known_versions.append(CURRENT_VERSION)
+
+ for agent_path in glob.iglob(path):
+ try:
+ name = fileutil.trim_ext(agent_path, "zip")
+ m = AGENT_DIR_PATTERN.match(name)
+ if m is not None and FlexibleVersion(m.group(1)) not in known_versions:
+ if os.path.isfile(agent_path):
+ logger.info(u"Purging outdated Agent file {0}", agent_path)
+ os.remove(agent_path)
+ else:
+ logger.info(u"Purging outdated Agent directory {0}", agent_path)
+ shutil.rmtree(agent_path)
+ except Exception as e:
+ logger.warn(u"Purging {0} raised exception: {1}", agent_path, ustr(e))
+ return
+
+ def _set_agents(self, agents=[]):
+ self.agents = agents
+ self.agents.sort(key=lambda agent: agent.version, reverse=True)
+ return
+
+
+class GuestAgent(object):
+ def __init__(self, path=None, pkg=None):
+ self.pkg = pkg
+ version = None
+ if path is not None:
+ m = AGENT_DIR_PATTERN.match(path)
+ if m == None:
+ raise UpdateError(u"Illegal agent directory: {0}".format(path))
+ version = m.group(1)
+ elif self.pkg is not None:
+ version = pkg.version
+
+ if version == None:
+ raise UpdateError(u"Illegal agent version: {0}".format(version))
+ self.version = FlexibleVersion(version)
+
+ location = u"disk" if path is not None else u"package"
+ logger.info(u"Instantiating Agent {0} from {1}", self.name, location)
+
+ self.error = None
+ self._load_error()
+ self._ensure_downloaded()
+ return
+
+ @property
+ def name(self):
+ return "{0}-{1}".format(AGENT_NAME, self.version)
+
+ def get_agent_cmd(self):
+ return self.manifest.get_enable_command()
+
+ def get_agent_dir(self):
+ return os.path.join(conf.get_lib_dir(), self.name)
+
+ def get_agent_error_file(self):
+ return os.path.join(conf.get_lib_dir(), self.name, AGENT_ERROR_FILE)
+
+ def get_agent_manifest_path(self):
+ return os.path.join(self.get_agent_dir(), AGENT_MANIFEST_FILE)
+
+ def get_agent_pkg_path(self):
+ return ".".join((os.path.join(conf.get_lib_dir(), self.name), "zip"))
+
+ def clear_error(self):
+ self.error.clear()
+ return
+
+ @property
+ def is_available(self):
+ return self.is_downloaded and not self.is_blacklisted
+
+ @property
+ def is_blacklisted(self):
+ return self.error is not None and self.error.is_blacklisted
+
+ @property
+ def is_downloaded(self):
+ return self.is_blacklisted or os.path.isfile(self.get_agent_manifest_path())
+
+ def mark_failure(self, is_fatal=False):
+ try:
+ if not os.path.isdir(self.get_agent_dir()):
+ os.makedirs(self.get_agent_dir())
+ self.error.mark_failure(is_fatal=is_fatal)
+ self.error.save()
+ if is_fatal:
+ logger.warn(u"Agent {0} is permanently blacklisted", self.name)
+ except Exception as e:
+ logger.warn(u"Agent {0} failed recording error state: {1}", self.name, ustr(e))
+ return
+
+ def _ensure_downloaded(self):
+ try:
+ logger.info(u"Ensuring Agent {0} is downloaded", self.name)
+
+ if self.is_blacklisted:
+ logger.info(u"Agent {0} is blacklisted - skipping download", self.name)
+ return
+
+ if self.is_downloaded:
+ logger.info(u"Agent {0} was previously downloaded - skipping download", self.name)
+ self._load_manifest()
+ return
+
+ if self.pkg is None:
+ raise UpdateError(u"Agent {0} is missing package and download URIs".format(
+ self.name))
+
+ self._download()
+ self._unpack()
+ self._load_manifest()
+ self._load_error()
+
+ msg = u"Agent {0} downloaded successfully".format(self.name)
+ logger.info(msg)
+ add_event(
+ AGENT_NAME,
+ version=self.version,
+ op=WALAEventOperation.Install,
+ is_success=True,
+ message=msg)
+
+ except Exception as e:
+ # Note the failure, blacklist the agent if the package downloaded
+ # - An exception with a downloaded package indicates the package
+ # is corrupt (e.g., missing the HandlerManifest.json file)
+ self.mark_failure(is_fatal=os.path.isfile(self.get_agent_pkg_path()))
+
+ msg = u"Agent {0} download failed with exception: {1}".format(self.name, ustr(e))
+ logger.warn(msg)
+ add_event(
+ AGENT_NAME,
+ version=self.version,
+ op=WALAEventOperation.Install,
+ is_success=False,
+ message=msg)
+ return
+
+ def _download(self):
+ package = None
+
+ for uri in self.pkg.uris:
+ try:
+ resp = restutil.http_get(uri.uri, chk_proxy=True)
+ if resp.status == restutil.httpclient.OK:
+ package = resp.read()
+ fileutil.write_file(self.get_agent_pkg_path(), bytearray(package), asbin=True)
+ logger.info(u"Agent {0} downloaded from {1}", self.name, uri.uri)
+ break
+ except restutil.HttpError as e:
+ logger.warn(u"Agent {0} download from {1} failed", self.name, uri.uri)
+
+ if not os.path.isfile(self.get_agent_pkg_path()):
+ msg = u"Unable to download Agent {0} from any URI".format(self.name)
+ add_event(
+ AGENT_NAME,
+ op=WALAEventOperation.Download,
+ version=CURRENT_VERSION,
+ is_success=False,
+ message=msg)
+ raise UpdateError(msg)
+ return
+
+ def _load_error(self):
+ try:
+ if self.error is None:
+ self.error = GuestAgentError(self.get_agent_error_file())
+ self.error.load()
+ logger.info(u"Agent {0} error state: {1}", self.name, ustr(self.error))
+ except Exception as e:
+ logger.warn(u"Agent {0} failed loading error state: {1}", self.name, ustr(e))
+ return
+
+ def _load_manifest(self):
+ path = self.get_agent_manifest_path()
+ if not os.path.isfile(path):
+ msg = u"Agent {0} is missing the {1} file".format(self.name, AGENT_MANIFEST_FILE)
+ raise UpdateError(msg)
+
+ with open(path, "r") as manifest_file:
+ try:
+ manifests = json.load(manifest_file)
+ except Exception as e:
+ msg = u"Agent {0} has a malformed {1}".format(self.name, AGENT_MANIFEST_FILE)
+ raise UpdateError(msg)
+ if type(manifests) is list:
+ if len(manifests) <= 0:
+ msg = u"Agent {0} has an empty {1}".format(self.name, AGENT_MANIFEST_FILE)
+ raise UpdateError(msg)
+ manifest = manifests[0]
+ else:
+ manifest = manifests
+
+ try:
+ self.manifest = HandlerManifest(manifest)
+ if len(self.manifest.get_enable_command()) <= 0:
+ raise Exception(u"Manifest is missing the enable command")
+ except Exception as e:
+ msg = u"Agent {0} has an illegal {1}: {2}".format(
+ self.name,
+ AGENT_MANIFEST_FILE,
+ ustr(e))
+ raise UpdateError(msg)
+
+ logger.info(
+ u"Agent {0} loaded manifest from {1}",
+ self.name,
+ self.get_agent_manifest_path())
+ logger.verbose(u"Successfully loaded Agent {0} {1}: {2}",
+ self.name,
+ AGENT_MANIFEST_FILE,
+ ustr(self.manifest.data))
+ return
+
+ def _unpack(self):
+ try:
+ if os.path.isdir(self.get_agent_dir()):
+ shutil.rmtree(self.get_agent_dir())
+
+ zipfile.ZipFile(self.get_agent_pkg_path()).extractall(self.get_agent_dir())
+
+ except Exception as e:
+ msg = u"Exception unpacking Agent {0} from {1}: {2}".format(
+ self.name,
+ self.get_agent_pkg_path(),
+ ustr(e))
+ raise UpdateError(msg)
+
+ if not os.path.isdir(self.get_agent_dir()):
+ msg = u"Unpacking Agent {0} failed to create directory {1}".format(
+ self.name,
+ self.get_agent_dir())
+ raise UpdateError(msg)
+
+ logger.info(
+ u"Agent {0} unpacked successfully to {1}",
+ self.name,
+ self.get_agent_dir())
+ return
+
+
+class GuestAgentError(object):
+ def __init__(self, path):
+ if path is None:
+ raise UpdateError(u"GuestAgentError requires a path")
+ self.path = path
+
+ self.clear()
+ self.load()
+ return
+
+ def mark_failure(self, is_fatal=False):
+ self.last_failure = time.time()
+ self.failure_count += 1
+ self.was_fatal = is_fatal
+ return
+
+ def clear(self):
+ self.last_failure = 0.0
+ self.failure_count = 0
+ self.was_fatal = False
+ return
+
+ def clear_old_failure(self):
+ if self.last_failure <= 0.0:
+ return
+ if self.last_failure < (time.time() - RETAIN_INTERVAL):
+ self.clear()
+ return
+
+ @property
+ def is_blacklisted(self):
+ return self.was_fatal or self.failure_count >= MAX_FAILURE
+
+ def load(self):
+ if self.path is not None and os.path.isfile(self.path):
+ with open(self.path, 'r') as f:
+ self.from_json(json.load(f))
+ return
+
+ def save(self):
+ if os.path.isdir(os.path.dirname(self.path)):
+ with open(self.path, 'w') as f:
+ json.dump(self.to_json(), f)
+ return
+
+ def from_json(self, data):
+ self.last_failure = max(
+ self.last_failure,
+ data.get(u"last_failure", 0.0))
+ self.failure_count = max(
+ self.failure_count,
+ data.get(u"failure_count", 0))
+ self.was_fatal = self.was_fatal or data.get(u"was_fatal", False)
+ return
+
+ def to_json(self):
+ data = {
+ u"last_failure": self.last_failure,
+ u"failure_count": self.failure_count,
+ u"was_fatal" : self.was_fatal
+ }
+ return data
+
+ def __str__(self):
+ return "Last Failure: {0}, Total Failures: {1}, Fatal: {2}".format(
+ self.last_failure,
+ self.failure_count,
+ self.was_fatal)