# Microsoft Azure Linux Agent
#
# Copyright 2014 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Requires Python 2.4+ and Openssl 1.0+
import json
import os
import re
import time
import xml.sax.saxutils as saxutils
import azurelinuxagent.common.conf as conf
import azurelinuxagent.common.utils.fileutil as fileutil
import azurelinuxagent.common.utils.textutil as textutil
from azurelinuxagent.common.exception import ProtocolNotFoundError, \
ResourceGoneError
from azurelinuxagent.common.future import httpclient, bytebuffer
from azurelinuxagent.common.protocol.hostplugin import HostPluginProtocol
from azurelinuxagent.common.protocol.restapi import *
from azurelinuxagent.common.utils.cryptutil import CryptUtil
from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, \
findtext, getattrib, gettext, remove_bom, get_bytes_from_pem, parse_json
VERSION_INFO_URI = "http://{0}/?comp=versions"
GOAL_STATE_URI = "http://{0}/machine/?comp=goalstate"
HEALTH_REPORT_URI = "http://{0}/machine?comp=health"
ROLE_PROP_URI = "http://{0}/machine?comp=roleProperties"
TELEMETRY_URI = "http://{0}/machine?comp=telemetrydata"
WIRE_SERVER_ADDR_FILE_NAME = "WireServer"
INCARNATION_FILE_NAME = "Incarnation"
GOAL_STATE_FILE_NAME = "GoalState.{0}.xml"
HOSTING_ENV_FILE_NAME = "HostingEnvironmentConfig.xml"
SHARED_CONF_FILE_NAME = "SharedConfig.xml"
CERTS_FILE_NAME = "Certificates.xml"
P7M_FILE_NAME = "Certificates.p7m"
PEM_FILE_NAME = "Certificates.pem"
EXT_CONF_FILE_NAME = "ExtensionsConfig.{0}.xml"
MANIFEST_FILE_NAME = "{0}.{1}.manifest.xml"
AGENTS_MANIFEST_FILE_NAME = "{0}.{1}.agentsManifest"
TRANSPORT_CERT_FILE_NAME = "TransportCert.pem"
TRANSPORT_PRV_FILE_NAME = "TransportPrivate.pem"
PROTOCOL_VERSION = "2012-11-30"
ENDPOINT_FINE_NAME = "WireServer"
SHORT_WAITING_INTERVAL = 1 # 1 second
class UploadError(HttpError):
pass
class WireProtocol(Protocol):
"""Slim layer to adapt wire protocol data to metadata protocol interface"""
# TODO: Clean-up goal state processing
# At present, some methods magically update GoalState (e.g.,
# get_vmagent_manifests), others (e.g., get_vmagent_pkgs)
# assume its presence. A better approach would make an explicit update
# call that returns the incarnation number and
# establishes that number the "context" for all other calls (either by
# updating the internal state of the protocol or
# by having callers pass the incarnation number to the method).
def __init__(self, endpoint):
if endpoint is None:
raise ProtocolError("WireProtocol endpoint is None")
self.endpoint = endpoint
self.client = WireClient(self.endpoint)
def detect(self):
self.client.check_wire_protocol_version()
trans_prv_file = os.path.join(conf.get_lib_dir(),
TRANSPORT_PRV_FILE_NAME)
trans_cert_file = os.path.join(conf.get_lib_dir(),
TRANSPORT_CERT_FILE_NAME)
cryptutil = CryptUtil(conf.get_openssl_cmd())
cryptutil.gen_transport_cert(trans_prv_file, trans_cert_file)
self.update_goal_state(forced=True)
def update_goal_state(self, forced=False, max_retry=3):
self.client.update_goal_state(forced=forced, max_retry=max_retry)
def get_vminfo(self):
goal_state = self.client.get_goal_state()
hosting_env = self.client.get_hosting_env()
vminfo = VMInfo()
vminfo.subscriptionId = None
vminfo.vmName = hosting_env.vm_name
vminfo.tenantName = hosting_env.deployment_name
vminfo.roleName = hosting_env.role_name
vminfo.roleInstanceName = goal_state.role_instance_id
vminfo.containerId = goal_state.container_id
return vminfo
def get_certs(self):
certificates = self.client.get_certs()
return certificates.cert_list
def get_incarnation(self):
path = os.path.join(conf.get_lib_dir(), INCARNATION_FILE_NAME)
if os.path.exists(path):
return fileutil.read_file(path)
else:
return 0
def get_vmagent_manifests(self):
# Update goal state to get latest extensions config
self.update_goal_state()
goal_state = self.client.get_goal_state()
ext_conf = self.client.get_ext_conf()
return ext_conf.vmagent_manifests, goal_state.incarnation
def get_vmagent_pkgs(self, vmagent_manifest):
goal_state = self.client.get_goal_state()
ga_manifest = self.client.get_gafamily_manifest(vmagent_manifest, goal_state)
valid_pkg_list = self.client.filter_package_list(vmagent_manifest.family, ga_manifest, goal_state)
return valid_pkg_list
def get_ext_handlers(self):
logger.verbose("Get extension handler config")
# Update goal state to get latest extensions config
self.update_goal_state()
goal_state = self.client.get_goal_state()
ext_conf = self.client.get_ext_conf()
# In wire protocol, incarnation is equivalent to ETag
return ext_conf.ext_handlers, goal_state.incarnation
def get_ext_handler_pkgs(self, ext_handler):
logger.verbose("Get extension handler package")
goal_state = self.client.get_goal_state()
man = self.client.get_ext_manifest(ext_handler, goal_state)
return man.pkg_list
def get_artifacts_profile(self):
logger.verbose("Get In-VM Artifacts Profile")
return self.client.get_artifacts_profile()
def download_ext_handler_pkg(self, uri, headers=None):
package = super(WireProtocol, self).download_ext_handler_pkg(uri)
if package is not None:
return package
else:
logger.warn("Download did not succeed, falling back to host plugin")
host = self.client.get_host_plugin()
uri, headers = host.get_artifact_request(uri, host.manifest_uri)
package = super(WireProtocol, self).download_ext_handler_pkg(uri, headers=headers)
return package
def report_provision_status(self, provision_status):
validate_param("provision_status", provision_status, ProvisionStatus)
if provision_status.status is not None:
self.client.report_health(provision_status.status,
provision_status.subStatus,
provision_status.description)
if provision_status.properties.certificateThumbprint is not None:
thumbprint = provision_status.properties.certificateThumbprint
self.client.report_role_prop(thumbprint)
def report_vm_status(self, vm_status):
validate_param("vm_status", vm_status, VMStatus)
self.client.status_blob.set_vm_status(vm_status)
self.client.upload_status_blob()
def report_ext_status(self, ext_handler_name, ext_name, ext_status):
validate_param("ext_status", ext_status, ExtensionStatus)
self.client.status_blob.set_ext_status(ext_handler_name, ext_status)
def report_event(self, events):
validate_param("events", events, TelemetryEventList)
self.client.report_event(events)
def _build_role_properties(container_id, role_instance_id, thumbprint):
xml = (u""
u""
u""
u"{0}"
u""
u""
u"{1}"
u""
u""
u""
u""
u""
u""
u""
u"").format(container_id, role_instance_id, thumbprint)
return xml
def _build_health_report(incarnation, container_id, role_instance_id,
status, substatus, description):
# Escape '&', '<' and '>'
description = saxutils.escape(ustr(description))
detail = u''
if substatus is not None:
substatus = saxutils.escape(ustr(substatus))
detail = (u""
u"{0}"
u"{1}"
u" ").format(substatus, description)
xml = (u""
u""
u"{0}"
u""
u"{1}"
u""
u""
u"{2}"
u""
u"{3}"
u"{4}"
u""
u""
u""
u""
u""
u"").format(incarnation,
container_id,
role_instance_id,
status,
detail)
return xml
"""
Convert VMStatus object to status blob format
"""
def ga_status_to_guest_info(ga_status):
v1_ga_guest_info = {
"computerName" : ga_status.hostname,
"osName" : ga_status.osname,
"osVersion" : ga_status.osversion,
"version" : ga_status.version,
}
return v1_ga_guest_info
def ga_status_to_v1(ga_status):
formatted_msg = {
'lang': 'en-US',
'message': ga_status.message
}
v1_ga_status = {
"version" : ga_status.version,
"status" : ga_status.status,
"formattedMessage" : formatted_msg
}
return v1_ga_status
def ext_substatus_to_v1(sub_status_list):
status_list = []
for substatus in sub_status_list:
status = {
"name": substatus.name,
"status": substatus.status,
"code": substatus.code,
"formattedMessage": {
"lang": "en-US",
"message": substatus.message
}
}
status_list.append(status)
return status_list
def ext_status_to_v1(ext_name, ext_status):
if ext_status is None:
return None
timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
v1_sub_status = ext_substatus_to_v1(ext_status.substatusList)
v1_ext_status = {
"status": {
"name": ext_name,
"configurationAppliedTime": ext_status.configurationAppliedTime,
"operation": ext_status.operation,
"status": ext_status.status,
"code": ext_status.code,
"formattedMessage": {
"lang": "en-US",
"message": ext_status.message
}
},
"version": 1.0,
"timestampUTC": timestamp
}
if len(v1_sub_status) != 0:
v1_ext_status['status']['substatus'] = v1_sub_status
return v1_ext_status
def ext_handler_status_to_v1(handler_status, ext_statuses, timestamp):
v1_handler_status = {
'handlerVersion': handler_status.version,
'handlerName': handler_status.name,
'status': handler_status.status,
'code': handler_status.code
}
if handler_status.message is not None:
v1_handler_status["formattedMessage"] = {
"lang": "en-US",
"message": handler_status.message
}
if handler_status.upgradeGuid is not None:
v1_handler_status["upgradeGuid"] = handler_status.upgradeGuid
if len(handler_status.extensions) > 0:
# Currently, no more than one extension per handler
ext_name = handler_status.extensions[0]
ext_status = ext_statuses.get(ext_name)
v1_ext_status = ext_status_to_v1(ext_name, ext_status)
if ext_status is not None and v1_ext_status is not None:
v1_handler_status["runtimeSettingsStatus"] = {
'settingsStatus': v1_ext_status,
'sequenceNumber': ext_status.sequenceNumber
}
return v1_handler_status
def vm_status_to_v1(vm_status, ext_statuses):
timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
v1_ga_guest_info = ga_status_to_guest_info(vm_status.vmAgent)
v1_ga_status = ga_status_to_v1(vm_status.vmAgent)
v1_handler_status_list = []
for handler_status in vm_status.vmAgent.extensionHandlers:
v1_handler_status = ext_handler_status_to_v1(handler_status,
ext_statuses, timestamp)
if v1_handler_status is not None:
v1_handler_status_list.append(v1_handler_status)
v1_agg_status = {
'guestAgentStatus': v1_ga_status,
'handlerAggregateStatus': v1_handler_status_list
}
v1_vm_status = {
'version': '1.1',
'timestampUTC': timestamp,
'aggregateStatus': v1_agg_status,
'guestOSInfo' : v1_ga_guest_info
}
return v1_vm_status
class StatusBlob(object):
def __init__(self, client):
self.vm_status = None
self.ext_statuses = {}
self.client = client
self.type = None
self.data = None
def set_vm_status(self, vm_status):
validate_param("vmAgent", vm_status, VMStatus)
self.vm_status = vm_status
def set_ext_status(self, ext_handler_name, ext_status):
validate_param("extensionStatus", ext_status, ExtensionStatus)
self.ext_statuses[ext_handler_name] = ext_status
def to_json(self):
report = vm_status_to_v1(self.vm_status, self.ext_statuses)
return json.dumps(report)
__storage_version__ = "2014-02-14"
def prepare(self, blob_type):
logger.verbose("Prepare status blob")
self.data = self.to_json()
self.type = blob_type
def upload(self, url):
try:
if not self.type in ["BlockBlob", "PageBlob"]:
raise ProtocolError("Illegal blob type: {0}".format(self.type))
if self.type == "BlockBlob":
self.put_block_blob(url, self.data)
else:
self.put_page_blob(url, self.data)
return True
except Exception as e:
logger.verbose("Initial status upload failed: {0}", e)
return False
def get_block_blob_headers(self, blob_size):
return {
"Content-Length": ustr(blob_size),
"x-ms-blob-type": "BlockBlob",
"x-ms-date": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"x-ms-version": self.__class__.__storage_version__
}
def put_block_blob(self, url, data):
logger.verbose("Put block blob")
headers = self.get_block_blob_headers(len(data))
resp = self.client.call_storage_service(restutil.http_put, url, data, headers)
if resp.status != httpclient.CREATED:
raise UploadError(
"Failed to upload block blob: {0}".format(resp.status))
def get_page_blob_create_headers(self, blob_size):
return {
"Content-Length": "0",
"x-ms-blob-content-length": ustr(blob_size),
"x-ms-blob-type": "PageBlob",
"x-ms-date": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"x-ms-version": self.__class__.__storage_version__
}
def get_page_blob_page_headers(self, start, end):
return {
"Content-Length": ustr(end - start),
"x-ms-date": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"x-ms-range": "bytes={0}-{1}".format(start, end - 1),
"x-ms-page-write": "update",
"x-ms-version": self.__class__.__storage_version__
}
def put_page_blob(self, url, data):
logger.verbose("Put page blob")
# Convert string into bytes and align to 512 bytes
data = bytearray(data, encoding='utf-8')
page_blob_size = int((len(data) + 511) / 512) * 512
headers = self.get_page_blob_create_headers(page_blob_size)
resp = self.client.call_storage_service(restutil.http_put, url, "", headers)
if resp.status != httpclient.CREATED:
raise UploadError(
"Failed to clean up page blob: {0}".format(resp.status))
if url.count("?") <= 0:
url = "{0}?comp=page".format(url)
else:
url = "{0}&comp=page".format(url)
logger.verbose("Upload page blob")
page_max = 4 * 1024 * 1024 # Max page size: 4MB
start = 0
end = 0
while end < len(data):
end = min(len(data), start + page_max)
content_size = end - start
# Align to 512 bytes
page_end = int((end + 511) / 512) * 512
buf_size = page_end - start
buf = bytearray(buf_size)
buf[0: content_size] = data[start: end]
headers = self.get_page_blob_page_headers(start, page_end)
resp = self.client.call_storage_service(
restutil.http_put,
url,
bytebuffer(buf),
headers)
if resp is None or resp.status != httpclient.CREATED:
raise UploadError(
"Failed to upload page blob: {0}".format(resp.status))
start = end
def event_param_to_v1(param):
param_format = ''
param_type = type(param.value)
attr_type = ""
if param_type is int:
attr_type = 'mt:uint64'
elif param_type is str:
attr_type = 'mt:wstr'
elif ustr(param_type).count("'unicode'") > 0:
attr_type = 'mt:wstr'
elif param_type is bool:
attr_type = 'mt:bool'
elif param_type is float:
attr_type = 'mt:float64'
return param_format.format(param.name,
saxutils.quoteattr(ustr(param.value)),
attr_type)
def event_to_v1(event):
params = ""
for param in event.parameters:
params += event_param_to_v1(param)
event_str = (''
''
'').format(event.eventId, params)
return event_str
class WireClient(object):
def __init__(self, endpoint):
logger.info("Wire server endpoint:{0}", endpoint)
self.endpoint = endpoint
self.goal_state = None
self.updated = None
self.hosting_env = None
self.shared_conf = None
self.certs = None
self.ext_conf = None
self.host_plugin = None
self.status_blob = StatusBlob(self)
def call_wireserver(self, http_req, *args, **kwargs):
try:
# Never use the HTTP proxy for wireserver
kwargs['use_proxy'] = False
resp = http_req(*args, **kwargs)
if restutil.request_failed(resp):
msg = "[Wireserver Failed] URI {0} ".format(args[0])
if resp is not None:
msg += " [HTTP Failed] Status Code {0}".format(resp.status)
raise ProtocolError(msg)
# If the GoalState is stale, pass along the exception to the caller
except ResourceGoneError:
raise
except Exception as e:
raise ProtocolError("[Wireserver Exception] {0}".format(
ustr(e)))
return resp
def decode_config(self, data):
if data is None:
return None
data = remove_bom(data)
xml_text = ustr(data, encoding='utf-8')
return xml_text
def fetch_config(self, uri, headers):
resp = self.call_wireserver(restutil.http_get,
uri,
headers=headers)
return self.decode_config(resp.read())
def fetch_cache(self, local_file):
if not os.path.isfile(local_file):
raise ProtocolError("{0} is missing.".format(local_file))
try:
return fileutil.read_file(local_file)
except IOError as e:
raise ProtocolError("Failed to read cache: {0}".format(e))
def save_cache(self, local_file, data):
try:
fileutil.write_file(local_file, data)
except IOError as e:
fileutil.clean_ioerror(e,
paths=[local_file])
raise ProtocolError("Failed to write cache: {0}".format(e))
@staticmethod
def call_storage_service(http_req, *args, **kwargs):
# Default to use the configured HTTP proxy
if not 'use_proxy' in kwargs or kwargs['use_proxy'] is None:
kwargs['use_proxy'] = True
return http_req(*args, **kwargs)
def fetch_manifest(self, version_uris):
logger.verbose("Fetch manifest")
for version in version_uris:
response = None
if not HostPluginProtocol.is_default_channel():
response = self.fetch(version.uri)
if not response:
if HostPluginProtocol.is_default_channel():
logger.verbose("Using host plugin as default channel")
else:
logger.verbose("Failed to download manifest, "
"switching to host plugin")
try:
host = self.get_host_plugin()
uri, headers = host.get_artifact_request(version.uri)
response = self.fetch(uri, headers, use_proxy=False)
# If the HostPlugin rejects the request,
# let the error continue, but set to use the HostPlugin
except ResourceGoneError:
HostPluginProtocol.set_default_channel(True)
raise
host.manifest_uri = version.uri
logger.verbose("Manifest downloaded successfully from host plugin")
if not HostPluginProtocol.is_default_channel():
logger.info("Setting host plugin as default channel")
HostPluginProtocol.set_default_channel(True)
if response:
return response
raise ProtocolError("Failed to fetch manifest from all sources")
def fetch(self, uri, headers=None, use_proxy=None):
logger.verbose("Fetch [{0}] with headers [{1}]", uri, headers)
try:
resp = self.call_storage_service(
restutil.http_get,
uri,
headers=headers,
use_proxy=use_proxy)
if restutil.request_failed(resp):
msg = "[Storage Failed] URI {0} ".format(uri)
if resp is not None:
msg += restutil.read_response_error(resp)
logger.warn(msg)
raise ProtocolError(msg)
return self.decode_config(resp.read())
except (HttpError, ProtocolError) as e:
logger.verbose("Fetch failed from [{0}]: {1}", uri, e)
if isinstance(e, ResourceGoneError):
raise
return None
def update_hosting_env(self, goal_state):
if goal_state.hosting_env_uri is None:
raise ProtocolError("HostingEnvironmentConfig uri is empty")
local_file = os.path.join(conf.get_lib_dir(), HOSTING_ENV_FILE_NAME)
xml_text = self.fetch_config(goal_state.hosting_env_uri,
self.get_header())
self.save_cache(local_file, xml_text)
self.hosting_env = HostingEnv(xml_text)
def update_shared_conf(self, goal_state):
if goal_state.shared_conf_uri is None:
raise ProtocolError("SharedConfig uri is empty")
local_file = os.path.join(conf.get_lib_dir(), SHARED_CONF_FILE_NAME)
xml_text = self.fetch_config(goal_state.shared_conf_uri,
self.get_header())
self.save_cache(local_file, xml_text)
self.shared_conf = SharedConfig(xml_text)
def update_certs(self, goal_state):
if goal_state.certs_uri is None:
return
local_file = os.path.join(conf.get_lib_dir(), CERTS_FILE_NAME)
xml_text = self.fetch_config(goal_state.certs_uri,
self.get_header_for_cert())
self.save_cache(local_file, xml_text)
self.certs = Certificates(self, xml_text)
def update_ext_conf(self, goal_state):
if goal_state.ext_uri is None:
logger.info("ExtensionsConfig.xml uri is empty")
self.ext_conf = ExtensionsConfig(None)
return
incarnation = goal_state.incarnation
local_file = os.path.join(conf.get_lib_dir(),
EXT_CONF_FILE_NAME.format(incarnation))
xml_text = self.fetch_config(goal_state.ext_uri, self.get_header())
self.save_cache(local_file, xml_text)
self.ext_conf = ExtensionsConfig(xml_text)
def update_goal_state(self, forced=False, max_retry=3):
incarnation_file = os.path.join(conf.get_lib_dir(),
INCARNATION_FILE_NAME)
uri = GOAL_STATE_URI.format(self.endpoint)
# Start updating goalstate, retry on 410
fetch_goal_state = True
for retry in range(0, max_retry):
try:
if fetch_goal_state:
fetch_goal_state = False
xml_text = self.fetch_config(uri, self.get_header())
goal_state = GoalState(xml_text)
if not forced:
last_incarnation = None
if os.path.isfile(incarnation_file):
last_incarnation = fileutil.read_file(
incarnation_file)
new_incarnation = goal_state.incarnation
if last_incarnation is not None and \
last_incarnation == new_incarnation:
# Goalstate is not updated.
return
self.goal_state = goal_state
file_name = GOAL_STATE_FILE_NAME.format(goal_state.incarnation)
goal_state_file = os.path.join(conf.get_lib_dir(), file_name)
self.save_cache(goal_state_file, xml_text)
self.update_hosting_env(goal_state)
self.update_shared_conf(goal_state)
self.update_certs(goal_state)
self.update_ext_conf(goal_state)
self.save_cache(incarnation_file, goal_state.incarnation)
if self.host_plugin is not None:
self.host_plugin.container_id = goal_state.container_id
self.host_plugin.role_config_name = goal_state.role_config_name
return
except ResourceGoneError:
logger.info("GoalState is stale -- re-fetching")
fetch_goal_state = True
except Exception as e:
log_method = logger.info \
if type(e) is ProtocolError \
else logger.warn
log_method(
"Exception processing GoalState-related files: {0}".format(
ustr(e)))
if retry < max_retry-1:
continue
raise
raise ProtocolError("Exceeded max retry updating goal state")
def get_goal_state(self):
if self.goal_state is None:
incarnation_file = os.path.join(conf.get_lib_dir(),
INCARNATION_FILE_NAME)
incarnation = self.fetch_cache(incarnation_file)
file_name = GOAL_STATE_FILE_NAME.format(incarnation)
goal_state_file = os.path.join(conf.get_lib_dir(), file_name)
xml_text = self.fetch_cache(goal_state_file)
self.goal_state = GoalState(xml_text)
return self.goal_state
def get_hosting_env(self):
if self.hosting_env is None:
local_file = os.path.join(conf.get_lib_dir(),
HOSTING_ENV_FILE_NAME)
xml_text = self.fetch_cache(local_file)
self.hosting_env = HostingEnv(xml_text)
return self.hosting_env
def get_shared_conf(self):
if self.shared_conf is None:
local_file = os.path.join(conf.get_lib_dir(),
SHARED_CONF_FILE_NAME)
xml_text = self.fetch_cache(local_file)
self.shared_conf = SharedConfig(xml_text)
return self.shared_conf
def get_certs(self):
if self.certs is None:
local_file = os.path.join(conf.get_lib_dir(), CERTS_FILE_NAME)
xml_text = self.fetch_cache(local_file)
self.certs = Certificates(self, xml_text)
if self.certs is None:
return None
return self.certs
def get_ext_conf(self):
if self.ext_conf is None:
goal_state = self.get_goal_state()
if goal_state.ext_uri is None:
self.ext_conf = ExtensionsConfig(None)
else:
local_file = EXT_CONF_FILE_NAME.format(goal_state.incarnation)
local_file = os.path.join(conf.get_lib_dir(), local_file)
xml_text = self.fetch_cache(local_file)
self.ext_conf = ExtensionsConfig(xml_text)
return self.ext_conf
def get_ext_manifest(self, ext_handler, goal_state):
for update_goal_state in [False, True]:
try:
if update_goal_state:
self.update_goal_state(forced=True)
goal_state = self.get_goal_state()
local_file = MANIFEST_FILE_NAME.format(
ext_handler.name,
goal_state.incarnation)
local_file = os.path.join(conf.get_lib_dir(), local_file)
xml_text = self.fetch_manifest(ext_handler.versionUris)
self.save_cache(local_file, xml_text)
return ExtensionManifest(xml_text)
except ResourceGoneError:
continue
raise ProtocolError("Failed to retrieve extension manifest")
def filter_package_list(self, family, ga_manifest, goal_state):
complete_list = ga_manifest.pkg_list
agent_manifest = os.path.join(conf.get_lib_dir(),
AGENTS_MANIFEST_FILE_NAME.format(
family,
goal_state.incarnation))
if not os.path.exists(agent_manifest):
# clear memory cache
ga_manifest.allowed_versions = None
# create disk cache
with open(agent_manifest, mode='w') as manifest_fh:
for version in complete_list.versions:
manifest_fh.write('{0}\n'.format(version.version))
fileutil.chmod(agent_manifest, 0o644)
return complete_list
else:
# use allowed versions from cache, otherwise from disk
if ga_manifest.allowed_versions is None:
with open(agent_manifest, mode='r') as manifest_fh:
ga_manifest.allowed_versions = [v.strip('\n') for v
in manifest_fh.readlines()]
# use the updated manifest urls for allowed versions
allowed_list = ExtHandlerPackageList()
allowed_list.versions = [version for version
in complete_list.versions
if version.version
in ga_manifest.allowed_versions]
return allowed_list
def get_gafamily_manifest(self, vmagent_manifest, goal_state):
for update_goal_state in [False, True]:
try:
if update_goal_state:
self.update_goal_state(forced=True)
goal_state = self.get_goal_state()
local_file = MANIFEST_FILE_NAME.format(
vmagent_manifest.family,
goal_state.incarnation)
local_file = os.path.join(conf.get_lib_dir(), local_file)
xml_text = self.fetch_manifest(
vmagent_manifest.versionsManifestUris)
fileutil.write_file(local_file, xml_text)
return ExtensionManifest(xml_text)
except ResourceGoneError:
continue
raise ProtocolError("Failed to retrieve GAFamily manifest")
def check_wire_protocol_version(self):
uri = VERSION_INFO_URI.format(self.endpoint)
version_info_xml = self.fetch_config(uri, None)
version_info = VersionInfo(version_info_xml)
preferred = version_info.get_preferred()
if PROTOCOL_VERSION == preferred:
logger.info("Wire protocol version:{0}", PROTOCOL_VERSION)
elif PROTOCOL_VERSION in version_info.get_supported():
logger.info("Wire protocol version:{0}", PROTOCOL_VERSION)
logger.warn("Server preferred version:{0}", preferred)
else:
error = ("Agent supported wire protocol version: {0} was not "
"advised by Fabric.").format(PROTOCOL_VERSION)
raise ProtocolNotFoundError(error)
def upload_status_blob(self):
for update_goal_state in [False, True]:
try:
if update_goal_state:
self.update_goal_state(forced=True)
ext_conf = self.get_ext_conf()
blob_uri = ext_conf.status_upload_blob
blob_type = ext_conf.status_upload_blob_type
if blob_uri is not None:
if not blob_type in ["BlockBlob", "PageBlob"]:
blob_type = "BlockBlob"
logger.verbose("Status Blob type is unspecified "
"-- assuming it is a BlockBlob")
try:
self.status_blob.prepare(blob_type)
except Exception as e:
self.report_status_event(
"Exception creating status blob: {0}", ustr(e))
return
if not HostPluginProtocol.is_default_channel():
try:
if self.status_blob.upload(blob_uri):
return
except HttpError as e:
pass
host = self.get_host_plugin()
host.put_vm_status(self.status_blob,
ext_conf.status_upload_blob,
ext_conf.status_upload_blob_type)
HostPluginProtocol.set_default_channel(True)
return
except Exception as e:
# If the HostPlugin rejects the request,
# let the error continue, but set to use the HostPlugin
if isinstance(e, ResourceGoneError):
HostPluginProtocol.set_default_channel(True)
continue
self.report_status_event(
"Exception uploading status blob: {0}", ustr(e))
return
def report_role_prop(self, thumbprint):
goal_state = self.get_goal_state()
role_prop = _build_role_properties(goal_state.container_id,
goal_state.role_instance_id,
thumbprint)
role_prop = role_prop.encode("utf-8")
role_prop_uri = ROLE_PROP_URI.format(self.endpoint)
headers = self.get_header_for_xml_content()
try:
resp = self.call_wireserver(restutil.http_post,
role_prop_uri,
role_prop,
headers=headers)
except HttpError as e:
raise ProtocolError((u"Failed to send role properties: "
u"{0}").format(e))
if resp.status != httpclient.ACCEPTED:
raise ProtocolError((u"Failed to send role properties: "
u",{0}: {1}").format(resp.status,
resp.read()))
def report_health(self, status, substatus, description):
goal_state = self.get_goal_state()
health_report = _build_health_report(goal_state.incarnation,
goal_state.container_id,
goal_state.role_instance_id,
status,
substatus,
description)
health_report = health_report.encode("utf-8")
health_report_uri = HEALTH_REPORT_URI.format(self.endpoint)
headers = self.get_header_for_xml_content()
try:
# 30 retries with 10s sleep gives ~5min for wireserver updates;
# this is retried 3 times with 15s sleep before throwing a
# ProtocolError, for a total of ~15min.
resp = self.call_wireserver(restutil.http_post,
health_report_uri,
health_report,
headers=headers,
max_retry=30,
retry_delay=15)
except HttpError as e:
raise ProtocolError((u"Failed to send provision status: "
u"{0}").format(e))
if restutil.request_failed(resp):
raise ProtocolError((u"Failed to send provision status: "
u",{0}: {1}").format(resp.status,
resp.read()))
def send_event(self, provider_id, event_str):
uri = TELEMETRY_URI.format(self.endpoint)
data_format = (''
''
'{1}'
''
'')
data = data_format.format(provider_id, event_str)
try:
header = self.get_header_for_xml_content()
resp = self.call_wireserver(restutil.http_post, uri, data, header)
except HttpError as e:
raise ProtocolError("Failed to send events:{0}".format(e))
if restutil.request_failed(resp):
logger.verbose(resp.read())
raise ProtocolError(
"Failed to send events:{0}".format(resp.status))
def report_event(self, event_list):
buf = {}
# Group events by providerId
for event in event_list.events:
if event.providerId not in buf:
buf[event.providerId] = ""
event_str = event_to_v1(event)
if len(event_str) >= 63 * 1024:
logger.warn("Single event too large: {0}", event_str[300:])
continue
if len(buf[event.providerId] + event_str) >= 63 * 1024:
self.send_event(event.providerId, buf[event.providerId])
buf[event.providerId] = ""
buf[event.providerId] = buf[event.providerId] + event_str
# Send out all events left in buffer.
for provider_id in list(buf.keys()):
if len(buf[provider_id]) > 0:
self.send_event(provider_id, buf[provider_id])
def report_status_event(self, message, *args):
from azurelinuxagent.common.event import report_event, \
WALAEventOperation
message = message.format(*args)
logger.warn(message)
report_event(op=WALAEventOperation.ReportStatus,
is_success=False,
message=message)
def get_header(self):
return {
"x-ms-agent-name": "WALinuxAgent",
"x-ms-version": PROTOCOL_VERSION
}
def get_header_for_xml_content(self):
return {
"x-ms-agent-name": "WALinuxAgent",
"x-ms-version": PROTOCOL_VERSION,
"Content-Type": "text/xml;charset=utf-8"
}
def get_header_for_cert(self):
trans_cert_file = os.path.join(conf.get_lib_dir(),
TRANSPORT_CERT_FILE_NAME)
content = self.fetch_cache(trans_cert_file)
cert = get_bytes_from_pem(content)
return {
"x-ms-agent-name": "WALinuxAgent",
"x-ms-version": PROTOCOL_VERSION,
"x-ms-cipher-name": "DES_EDE3_CBC",
"x-ms-guest-agent-public-x509-cert": cert
}
def get_host_plugin(self):
if self.host_plugin is None:
goal_state = self.get_goal_state()
self.host_plugin = HostPluginProtocol(self.endpoint,
goal_state.container_id,
goal_state.role_config_name)
return self.host_plugin
def has_artifacts_profile_blob(self):
return self.ext_conf and not \
textutil.is_str_none_or_whitespace(self.ext_conf.artifacts_profile_blob)
def get_artifacts_profile(self):
artifacts_profile = None
for update_goal_state in [False, True]:
try:
if update_goal_state:
self.update_goal_state(forced=True)
if self.has_artifacts_profile_blob():
blob = self.ext_conf.artifacts_profile_blob
profile = None
if not HostPluginProtocol.is_default_channel():
logger.verbose("Retrieving the artifacts profile")
profile = self.fetch(blob)
if profile is None:
if HostPluginProtocol.is_default_channel():
logger.verbose("Using host plugin as default channel")
else:
logger.verbose("Failed to download artifacts profile, "
"switching to host plugin")
host = self.get_host_plugin()
uri, headers = host.get_artifact_request(blob)
config = self.fetch(uri, headers, use_proxy=False)
profile = self.decode_config(config)
if not textutil.is_str_none_or_whitespace(profile):
logger.verbose("Artifacts profile downloaded")
artifacts_profile = InVMArtifactsProfile(profile)
return artifacts_profile
except ResourceGoneError:
HostPluginProtocol.set_default_channel(True)
continue
except Exception as e:
logger.warn(
"Exception retrieving artifacts profile: {0}".format(
ustr(e)))
return None
class VersionInfo(object):
def __init__(self, xml_text):
"""
Query endpoint server for wire protocol version.
Fail if our desired protocol version is not seen.
"""
logger.verbose("Load Version.xml")
self.parse(xml_text)
def parse(self, xml_text):
xml_doc = parse_doc(xml_text)
preferred = find(xml_doc, "Preferred")
self.preferred = findtext(preferred, "Version")
logger.info("Fabric preferred wire protocol version:{0}",
self.preferred)
self.supported = []
supported = find(xml_doc, "Supported")
supported_version = findall(supported, "Version")
for node in supported_version:
version = gettext(node)
logger.verbose("Fabric supported wire protocol version:{0}",
version)
self.supported.append(version)
def get_preferred(self):
return self.preferred
def get_supported(self):
return self.supported
class GoalState(object):
def __init__(self, xml_text):
if xml_text is None:
raise ValueError("GoalState.xml is None")
logger.verbose("Load GoalState.xml")
self.incarnation = None
self.expected_state = None
self.hosting_env_uri = None
self.shared_conf_uri = None
self.certs_uri = None
self.ext_uri = None
self.role_instance_id = None
self.role_config_name = None
self.container_id = None
self.load_balancer_probe_port = None
self.xml_text = None
self.parse(xml_text)
def parse(self, xml_text):
"""
Request configuration data from endpoint server.
"""
self.xml_text = xml_text
xml_doc = parse_doc(xml_text)
self.incarnation = findtext(xml_doc, "Incarnation")
self.expected_state = findtext(xml_doc, "ExpectedState")
self.hosting_env_uri = findtext(xml_doc, "HostingEnvironmentConfig")
self.shared_conf_uri = findtext(xml_doc, "SharedConfig")
self.certs_uri = findtext(xml_doc, "Certificates")
self.ext_uri = findtext(xml_doc, "ExtensionsConfig")
role_instance = find(xml_doc, "RoleInstance")
self.role_instance_id = findtext(role_instance, "InstanceId")
role_config = find(role_instance, "Configuration")
self.role_config_name = findtext(role_config, "ConfigName")
container = find(xml_doc, "Container")
self.container_id = findtext(container, "ContainerId")
lbprobe_ports = find(xml_doc, "LBProbePorts")
self.load_balancer_probe_port = findtext(lbprobe_ports, "Port")
return self
class HostingEnv(object):
"""
parse Hosting enviromnet config and store in
HostingEnvironmentConfig.xml
"""
def __init__(self, xml_text):
if xml_text is None:
raise ValueError("HostingEnvironmentConfig.xml is None")
logger.verbose("Load HostingEnvironmentConfig.xml")
self.vm_name = None
self.role_name = None
self.deployment_name = None
self.xml_text = None
self.parse(xml_text)
def parse(self, xml_text):
"""
parse and create HostingEnvironmentConfig.xml.
"""
self.xml_text = xml_text
xml_doc = parse_doc(xml_text)
incarnation = find(xml_doc, "Incarnation")
self.vm_name = getattrib(incarnation, "instance")
role = find(xml_doc, "Role")
self.role_name = getattrib(role, "name")
deployment = find(xml_doc, "Deployment")
self.deployment_name = getattrib(deployment, "name")
return self
class SharedConfig(object):
"""
parse role endpoint server and goal state config.
"""
def __init__(self, xml_text):
logger.verbose("Load SharedConfig.xml")
self.parse(xml_text)
def parse(self, xml_text):
"""
parse and write configuration to file SharedConfig.xml.
"""
# Not used currently
return self
class Certificates(object):
"""
Object containing certificates of host and provisioned user.
"""
def __init__(self, client, xml_text):
logger.verbose("Load Certificates.xml")
self.client = client
self.cert_list = CertList()
self.parse(xml_text)
def parse(self, xml_text):
"""
Parse multiple certificates into seperate files.
"""
xml_doc = parse_doc(xml_text)
data = findtext(xml_doc, "Data")
if data is None:
return
cryptutil = CryptUtil(conf.get_openssl_cmd())
p7m_file = os.path.join(conf.get_lib_dir(), P7M_FILE_NAME)
p7m = ("MIME-Version:1.0\n"
"Content-Disposition: attachment; filename=\"{0}\"\n"
"Content-Type: application/x-pkcs7-mime; name=\"{1}\"\n"
"Content-Transfer-Encoding: base64\n"
"\n"
"{2}").format(p7m_file, p7m_file, data)
self.client.save_cache(p7m_file, p7m)
trans_prv_file = os.path.join(conf.get_lib_dir(),
TRANSPORT_PRV_FILE_NAME)
trans_cert_file = os.path.join(conf.get_lib_dir(),
TRANSPORT_CERT_FILE_NAME)
pem_file = os.path.join(conf.get_lib_dir(), PEM_FILE_NAME)
# decrypt certificates
cryptutil.decrypt_p7m(p7m_file, trans_prv_file, trans_cert_file,
pem_file)
# The parsing process use public key to match prv and crt.
buf = []
begin_crt = False
begin_prv = False
prvs = {}
thumbprints = {}
index = 0
v1_cert_list = []
with open(pem_file) as pem:
for line in pem.readlines():
buf.append(line)
if re.match(r'[-]+BEGIN.*KEY[-]+', line):
begin_prv = True
elif re.match(r'[-]+BEGIN.*CERTIFICATE[-]+', line):
begin_crt = True
elif re.match(r'[-]+END.*KEY[-]+', line):
tmp_file = self.write_to_tmp_file(index, 'prv', buf)
pub = cryptutil.get_pubkey_from_prv(tmp_file)
prvs[pub] = tmp_file
buf = []
index += 1
begin_prv = False
elif re.match(r'[-]+END.*CERTIFICATE[-]+', line):
tmp_file = self.write_to_tmp_file(index, 'crt', buf)
pub = cryptutil.get_pubkey_from_crt(tmp_file)
thumbprint = cryptutil.get_thumbprint_from_crt(tmp_file)
thumbprints[pub] = thumbprint
# Rename crt with thumbprint as the file name
crt = "{0}.crt".format(thumbprint)
v1_cert_list.append({
"name": None,
"thumbprint": thumbprint
})
os.rename(tmp_file, os.path.join(conf.get_lib_dir(), crt))
buf = []
index += 1
begin_crt = False
# Rename prv key with thumbprint as the file name
for pubkey in prvs:
thumbprint = thumbprints[pubkey]
if thumbprint:
tmp_file = prvs[pubkey]
prv = "{0}.prv".format(thumbprint)
os.rename(tmp_file, os.path.join(conf.get_lib_dir(), prv))
for v1_cert in v1_cert_list:
cert = Cert()
set_properties("certs", cert, v1_cert)
self.cert_list.certificates.append(cert)
def write_to_tmp_file(self, index, suffix, buf):
file_name = os.path.join(conf.get_lib_dir(),
"{0}.{1}".format(index, suffix))
self.client.save_cache(file_name, "".join(buf))
return file_name
class ExtensionsConfig(object):
"""
parse ExtensionsConfig, downloading and unpacking them to /var/lib/waagent.
Install if true, remove if it is set to false.
"""
def __init__(self, xml_text):
logger.verbose("Load ExtensionsConfig.xml")
self.ext_handlers = ExtHandlerList()
self.vmagent_manifests = VMAgentManifestList()
self.status_upload_blob = None
self.status_upload_blob_type = None
self.artifacts_profile_blob = None
if xml_text is not None:
self.parse(xml_text)
def parse(self, xml_text):
"""
Write configuration to file ExtensionsConfig.xml.
"""
xml_doc = parse_doc(xml_text)
ga_families_list = find(xml_doc, "GAFamilies")
ga_families = findall(ga_families_list, "GAFamily")
for ga_family in ga_families:
family = findtext(ga_family, "Name")
uris_list = find(ga_family, "Uris")
uris = findall(uris_list, "Uri")
manifest = VMAgentManifest()
manifest.family = family
for uri in uris:
manifestUri = VMAgentManifestUri(uri=gettext(uri))
manifest.versionsManifestUris.append(manifestUri)
self.vmagent_manifests.vmAgentManifests.append(manifest)
plugins_list = find(xml_doc, "Plugins")
plugins = findall(plugins_list, "Plugin")
plugin_settings_list = find(xml_doc, "PluginSettings")
plugin_settings = findall(plugin_settings_list, "Plugin")
for plugin in plugins:
ext_handler = self.parse_plugin(plugin)
self.ext_handlers.extHandlers.append(ext_handler)
self.parse_plugin_settings(ext_handler, plugin_settings)
self.status_upload_blob = findtext(xml_doc, "StatusUploadBlob")
self.artifacts_profile_blob = findtext(xml_doc, "InVMArtifactsProfileBlob")
status_upload_node = find(xml_doc, "StatusUploadBlob")
self.status_upload_blob_type = getattrib(status_upload_node,
"statusBlobType")
logger.verbose("Extension config shows status blob type as [{0}]",
self.status_upload_blob_type)
def parse_plugin(self, plugin):
ext_handler = ExtHandler()
ext_handler.name = getattrib(plugin, "name")
ext_handler.properties.version = getattrib(plugin, "version")
ext_handler.properties.state = getattrib(plugin, "state")
ext_handler.properties.upgradeGuid = getattrib(plugin, "upgradeGuid")
if not ext_handler.properties.upgradeGuid:
ext_handler.properties.upgradeGuid = None
auto_upgrade = getattrib(plugin, "autoUpgrade")
if auto_upgrade is not None and auto_upgrade.lower() == "true":
ext_handler.properties.upgradePolicy = "auto"
else:
ext_handler.properties.upgradePolicy = "manual"
location = getattrib(plugin, "location")
failover_location = getattrib(plugin, "failoverlocation")
for uri in [location, failover_location]:
version_uri = ExtHandlerVersionUri()
version_uri.uri = uri
ext_handler.versionUris.append(version_uri)
return ext_handler
def parse_plugin_settings(self, ext_handler, plugin_settings):
if plugin_settings is None:
return
name = ext_handler.name
version = ext_handler.properties.version
settings = [x for x in plugin_settings \
if getattrib(x, "name") == name and \
getattrib(x, "version") == version]
if settings is None or len(settings) == 0:
return
runtime_settings = None
runtime_settings_node = find(settings[0], "RuntimeSettings")
seqNo = getattrib(runtime_settings_node, "seqNo")
runtime_settings_str = gettext(runtime_settings_node)
try:
runtime_settings = json.loads(runtime_settings_str)
except ValueError as e:
logger.error("Invalid extension settings")
return
for plugin_settings_list in runtime_settings["runtimeSettings"]:
handler_settings = plugin_settings_list["handlerSettings"]
ext = Extension()
# There is no "extension name" in wire protocol.
# Put
ext.name = ext_handler.name
ext.sequenceNumber = seqNo
ext.publicSettings = handler_settings.get("publicSettings")
ext.protectedSettings = handler_settings.get("protectedSettings")
thumbprint = handler_settings.get(
"protectedSettingsCertThumbprint")
ext.certificateThumbprint = thumbprint
ext_handler.properties.extensions.append(ext)
class ExtensionManifest(object):
def __init__(self, xml_text):
if xml_text is None:
raise ValueError("ExtensionManifest is None")
logger.verbose("Load ExtensionManifest.xml")
self.pkg_list = ExtHandlerPackageList()
self.allowed_versions = None
self.parse(xml_text)
def parse(self, xml_text):
xml_doc = parse_doc(xml_text)
self._handle_packages(findall(find(xml_doc,
"Plugins"),
"Plugin"),
False)
self._handle_packages(findall(find(xml_doc,
"InternalPlugins"),
"Plugin"),
True)
def _handle_packages(self, packages, isinternal):
for package in packages:
version = findtext(package, "Version")
disallow_major_upgrade = findtext(package,
"DisallowMajorVersionUpgrade")
if disallow_major_upgrade is None:
disallow_major_upgrade = ''
disallow_major_upgrade = disallow_major_upgrade.lower() == "true"
uris = find(package, "Uris")
uri_list = findall(uris, "Uri")
uri_list = [gettext(x) for x in uri_list]
pkg = ExtHandlerPackage()
pkg.version = version
pkg.disallow_major_upgrade = disallow_major_upgrade
for uri in uri_list:
pkg_uri = ExtHandlerVersionUri()
pkg_uri.uri = uri
pkg.uris.append(pkg_uri)
pkg.isinternal = isinternal
self.pkg_list.versions.append(pkg)
# Do not extend this class
class InVMArtifactsProfile(object):
"""
deserialized json string of InVMArtifactsProfile.
It is expected to contain the following fields:
* inVMArtifactsProfileBlobSeqNo
* profileId (optional)
* onHold (optional)
* certificateThumbprint (optional)
* encryptedHealthChecks (optional)
* encryptedApplicationProfile (optional)
"""
def __init__(self, artifacts_profile):
if not textutil.is_str_none_or_whitespace(artifacts_profile):
self.__dict__.update(parse_json(artifacts_profile))
def is_on_hold(self):
# hasattr() is not available in Python 2.6
if 'onHold' in self.__dict__:
return self.onHold.lower() == 'true'
return False