summaryrefslogtreecommitdiff
path: root/cloudinit/sources/helpers/azure.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/sources/helpers/azure.py')
-rwxr-xr-xcloudinit/sources/helpers/azure.py354
1 files changed, 272 insertions, 82 deletions
diff --git a/cloudinit/sources/helpers/azure.py b/cloudinit/sources/helpers/azure.py
index b968a96f..d3055d08 100755
--- a/cloudinit/sources/helpers/azure.py
+++ b/cloudinit/sources/helpers/azure.py
@@ -9,6 +9,7 @@ import struct
import time
import textwrap
import zlib
+from errno import ENOENT
from cloudinit.settings import CFG_BUILTIN
from cloudinit.net import dhcp
@@ -16,6 +17,7 @@ from cloudinit import stages
from cloudinit import temp_utils
from contextlib import contextmanager
from xml.etree import ElementTree
+from xml.sax.saxutils import escape
from cloudinit import subp
from cloudinit import url_helper
@@ -41,13 +43,19 @@ COMPRESSED_EVENT_TYPE = 'compressed'
# cloud-init.log files where the P95 of the file sizes was 537KB and the time
# consumed to dump 500KB file was (P95:76, P99:233, P99.9:1170) in ms
MAX_LOG_TO_KVP_LENGTH = 512000
-# Marker file to indicate whether cloud-init.log is pushed to KVP
-LOG_PUSHED_TO_KVP_MARKER_FILE = '/var/lib/cloud/data/log_pushed_to_kvp'
+# File to store the last byte of cloud-init.log that was pushed to KVP. This
+# file will be deleted with every VM reboot.
+LOG_PUSHED_TO_KVP_INDEX_FILE = '/run/cloud-init/log_pushed_to_kvp_index'
azure_ds_reporter = events.ReportEventStack(
name="azure-ds",
description="initialize reporter for azure ds",
reporting_enabled=True)
+DEFAULT_REPORT_FAILURE_USER_VISIBLE_MESSAGE = (
+ 'The VM encountered an error during deployment. '
+ 'Please visit https://aka.ms/linuxprovisioningerror '
+ 'for more information on remediation.')
+
def azure_ds_telemetry_reporter(func):
def impl(*args, **kwargs):
@@ -180,12 +188,15 @@ def get_system_info():
return evt
-def report_diagnostic_event(str):
+def report_diagnostic_event(
+ msg: str, *, logger_func=None) -> events.ReportingEvent:
"""Report a diagnostic event"""
+ if callable(logger_func):
+ logger_func(msg)
evt = events.ReportingEvent(
DIAGNOSTIC_EVENT_TYPE, 'diagnostic message',
- str, events.DEFAULT_EVENT_ORIGIN)
- events.report_event(evt)
+ msg, events.DEFAULT_EVENT_ORIGIN)
+ events.report_event(evt, excluded_handler_types={"log"})
# return the event for unit testing purpose
return evt
@@ -211,27 +222,58 @@ def report_compressed_event(event_name, event_content):
def push_log_to_kvp(file_name=CFG_BUILTIN['def_log_file']):
"""Push a portion of cloud-init.log file or the whole file to KVP
based on the file size.
- If called more than once, it skips pushing the log file to KVP again."""
+ The first time this function is called after VM boot, It will push the last
+ n bytes of the log file such that n < MAX_LOG_TO_KVP_LENGTH
+ If called again on the same boot, it continues from where it left off.
+ In addition to cloud-init.log, dmesg log will also be collected."""
- log_pushed_to_kvp = bool(os.path.isfile(LOG_PUSHED_TO_KVP_MARKER_FILE))
- if log_pushed_to_kvp:
- report_diagnostic_event("cloud-init.log is already pushed to KVP")
- return
+ start_index = get_last_log_byte_pushed_to_kvp_index()
LOG.debug("Dumping cloud-init.log file to KVP")
try:
with open(file_name, "rb") as f:
f.seek(0, os.SEEK_END)
- seek_index = max(f.tell() - MAX_LOG_TO_KVP_LENGTH, 0)
+ seek_index = max(f.tell() - MAX_LOG_TO_KVP_LENGTH, start_index)
report_diagnostic_event(
- "Dumping last {} bytes of cloud-init.log file to KVP".format(
- f.tell() - seek_index))
+ "Dumping last {0} bytes of cloud-init.log file to KVP starting"
+ " from index: {1}".format(f.tell() - seek_index, seek_index),
+ logger_func=LOG.debug)
f.seek(seek_index, os.SEEK_SET)
report_compressed_event("cloud-init.log", f.read())
- util.write_file(LOG_PUSHED_TO_KVP_MARKER_FILE, '')
+ util.write_file(LOG_PUSHED_TO_KVP_INDEX_FILE, str(f.tell()))
+ except Exception as ex:
+ report_diagnostic_event(
+ "Exception when dumping log file: %s" % repr(ex),
+ logger_func=LOG.warning)
+
+ LOG.debug("Dumping dmesg log to KVP")
+ try:
+ out, _ = subp.subp(['dmesg'], decode=False, capture=True)
+ report_compressed_event("dmesg", out)
except Exception as ex:
- report_diagnostic_event("Exception when dumping log file: %s" %
- repr(ex))
+ report_diagnostic_event(
+ "Exception when dumping dmesg log: %s" % repr(ex),
+ logger_func=LOG.warning)
+
+
+@azure_ds_telemetry_reporter
+def get_last_log_byte_pushed_to_kvp_index():
+ try:
+ with open(LOG_PUSHED_TO_KVP_INDEX_FILE, "r") as f:
+ return int(f.read())
+ except IOError as e:
+ if e.errno != ENOENT:
+ report_diagnostic_event("Reading LOG_PUSHED_TO_KVP_INDEX_FILE"
+ " failed: %s." % repr(e),
+ logger_func=LOG.warning)
+ except ValueError as e:
+ report_diagnostic_event("Invalid value in LOG_PUSHED_TO_KVP_INDEX_FILE"
+ ": %s." % repr(e),
+ logger_func=LOG.warning)
+ except Exception as e:
+ report_diagnostic_event("Failed to get the last log byte pushed to KVP"
+ ": %s." % repr(e), logger_func=LOG.warning)
+ return 0
@contextmanager
@@ -252,6 +294,54 @@ def _get_dhcp_endpoint_option_name():
return azure_endpoint
+@azure_ds_telemetry_reporter
+def http_with_retries(url, **kwargs) -> str:
+ """Wrapper around url_helper.readurl() with custom telemetry logging
+ that url_helper.readurl() does not provide.
+ """
+ exc = None
+
+ max_readurl_attempts = 240
+ default_readurl_timeout = 5
+ periodic_logging_attempts = 12
+
+ if 'timeout' not in kwargs:
+ kwargs['timeout'] = default_readurl_timeout
+
+ # remove kwargs that cause url_helper.readurl to retry,
+ # since we are already implementing our own retry logic.
+ if kwargs.pop('retries', None):
+ LOG.warning(
+ 'Ignoring retries kwarg passed in for '
+ 'communication with Azure endpoint.')
+ if kwargs.pop('infinite', None):
+ LOG.warning(
+ 'Ignoring infinite kwarg passed in for communication '
+ 'with Azure endpoint.')
+
+ for attempt in range(1, max_readurl_attempts + 1):
+ try:
+ ret = url_helper.readurl(url, **kwargs)
+
+ report_diagnostic_event(
+ 'Successful HTTP request with Azure endpoint %s after '
+ '%d attempts' % (url, attempt),
+ logger_func=LOG.debug)
+
+ return ret
+
+ except Exception as e:
+ exc = e
+ if attempt % periodic_logging_attempts == 0:
+ report_diagnostic_event(
+ 'Failed HTTP request with Azure endpoint %s during '
+ 'attempt %d with exception: %s' %
+ (url, attempt, e),
+ logger_func=LOG.debug)
+
+ raise exc
+
+
class AzureEndpointHttpClient:
headers = {
@@ -270,16 +360,15 @@ class AzureEndpointHttpClient:
if secure:
headers = self.headers.copy()
headers.update(self.extra_secure_headers)
- return url_helper.readurl(url, headers=headers,
- timeout=5, retries=10, sec_between=5)
+ return http_with_retries(url, headers=headers)
def post(self, url, data=None, extra_headers=None):
headers = self.headers
if extra_headers is not None:
headers = self.headers.copy()
headers.update(extra_headers)
- return url_helper.readurl(url, data=data, headers=headers,
- timeout=5, retries=10, sec_between=5)
+ return http_with_retries(
+ url, data=data, headers=headers)
class InvalidGoalStateXMLException(Exception):
@@ -288,11 +377,16 @@ class InvalidGoalStateXMLException(Exception):
class GoalState:
- def __init__(self, unparsed_xml, azure_endpoint_client):
+ def __init__(
+ self,
+ unparsed_xml: str,
+ azure_endpoint_client: AzureEndpointHttpClient,
+ need_certificate: bool = True) -> None:
"""Parses a GoalState XML string and returns a GoalState object.
@param unparsed_xml: string representing a GoalState XML.
- @param azure_endpoint_client: instance of AzureEndpointHttpClient
+ @param azure_endpoint_client: instance of AzureEndpointHttpClient.
+ @param need_certificate: switch to know if certificates is needed.
@return: GoalState object representing the GoalState XML string.
"""
self.azure_endpoint_client = azure_endpoint_client
@@ -300,9 +394,9 @@ class GoalState:
try:
self.root = ElementTree.fromstring(unparsed_xml)
except ElementTree.ParseError as e:
- msg = 'Failed to parse GoalState XML: %s'
- LOG.warning(msg, e)
- report_diagnostic_event(msg % (e,))
+ report_diagnostic_event(
+ 'Failed to parse GoalState XML: %s' % e,
+ logger_func=LOG.warning)
raise
self.container_id = self._text_from_xpath('./Container/ContainerId')
@@ -312,16 +406,15 @@ class GoalState:
for attr in ("container_id", "instance_id", "incarnation"):
if getattr(self, attr) is None:
- msg = 'Missing %s in GoalState XML'
- LOG.warning(msg, attr)
- report_diagnostic_event(msg % (attr,))
+ msg = 'Missing %s in GoalState XML' % attr
+ report_diagnostic_event(msg, logger_func=LOG.warning)
raise InvalidGoalStateXMLException(msg)
self.certificates_xml = None
url = self._text_from_xpath(
'./Container/RoleInstanceList/RoleInstance'
'/Configuration/Certificates')
- if url is not None:
+ if url is not None and need_certificate:
with events.ReportEventStack(
name="get-certificates-xml",
description="get certificates xml",
@@ -349,12 +442,20 @@ class OpenSSLManager:
def __init__(self):
self.tmpdir = temp_utils.mkdtemp()
- self.certificate = None
+ self._certificate = None
self.generate_certificate()
def clean_up(self):
util.del_dir(self.tmpdir)
+ @property
+ def certificate(self):
+ return self._certificate
+
+ @certificate.setter
+ def certificate(self, value):
+ self._certificate = value
+
@azure_ds_telemetry_reporter
def generate_certificate(self):
LOG.debug('Generating certificate for communication with fabric...')
@@ -477,8 +578,15 @@ class GoalStateHealthReporter:
''')
PROVISIONING_SUCCESS_STATUS = 'Ready'
+ PROVISIONING_NOT_READY_STATUS = 'NotReady'
+ PROVISIONING_FAILURE_SUBSTATUS = 'ProvisioningFailed'
+
+ HEALTH_REPORT_DESCRIPTION_TRIM_LEN = 512
- def __init__(self, goal_state, azure_endpoint_client, endpoint):
+ def __init__(
+ self, goal_state: GoalState,
+ azure_endpoint_client: AzureEndpointHttpClient,
+ endpoint: str) -> None:
"""Creates instance that will report provisioning status to an endpoint
@param goal_state: An instance of class GoalState that contains
@@ -495,7 +603,7 @@ class GoalStateHealthReporter:
self._endpoint = endpoint
@azure_ds_telemetry_reporter
- def send_ready_signal(self):
+ def send_ready_signal(self) -> None:
document = self.build_report(
incarnation=self._goal_state.incarnation,
container_id=self._goal_state.container_id,
@@ -505,32 +613,52 @@ class GoalStateHealthReporter:
try:
self._post_health_report(document=document)
except Exception as e:
- msg = "exception while reporting ready: %s" % e
- LOG.error(msg)
- report_diagnostic_event(msg)
+ report_diagnostic_event(
+ "exception while reporting ready: %s" % e,
+ logger_func=LOG.error)
raise
LOG.info('Reported ready to Azure fabric.')
+ @azure_ds_telemetry_reporter
+ def send_failure_signal(self, description: str) -> None:
+ document = self.build_report(
+ incarnation=self._goal_state.incarnation,
+ container_id=self._goal_state.container_id,
+ instance_id=self._goal_state.instance_id,
+ status=self.PROVISIONING_NOT_READY_STATUS,
+ substatus=self.PROVISIONING_FAILURE_SUBSTATUS,
+ description=description)
+ try:
+ self._post_health_report(document=document)
+ except Exception as e:
+ msg = "exception while reporting failure: %s" % e
+ report_diagnostic_event(msg, logger_func=LOG.error)
+ raise
+
+ LOG.warning('Reported failure to Azure fabric.')
+
def build_report(
- self, incarnation, container_id, instance_id,
- status, substatus=None, description=None):
+ self, incarnation: str, container_id: str, instance_id: str,
+ status: str, substatus=None, description=None) -> str:
health_detail = ''
if substatus is not None:
health_detail = self.HEALTH_DETAIL_SUBSECTION_XML_TEMPLATE.format(
- health_substatus=substatus, health_description=description)
+ health_substatus=escape(substatus),
+ health_description=escape(
+ description[:self.HEALTH_REPORT_DESCRIPTION_TRIM_LEN]))
health_report = self.HEALTH_REPORT_XML_TEMPLATE.format(
- incarnation=incarnation,
- container_id=container_id,
- instance_id=instance_id,
- health_status=status,
+ incarnation=escape(str(incarnation)),
+ container_id=escape(container_id),
+ instance_id=escape(instance_id),
+ health_status=escape(status),
health_detail_subsection=health_detail)
return health_report
@azure_ds_telemetry_reporter
- def _post_health_report(self, document):
+ def _post_health_report(self, document: str) -> None:
push_log_to_kvp()
# Whenever report_diagnostic_event(diagnostic_msg) is invoked in code,
@@ -690,43 +818,52 @@ class WALinuxAgentShim:
value = dhcp245
LOG.debug("Using Azure Endpoint from dhcp options")
if value is None:
- report_diagnostic_event("No Azure endpoint from dhcp options")
- LOG.debug('Finding Azure endpoint from networkd...')
+ report_diagnostic_event(
+ 'No Azure endpoint from dhcp options. '
+ 'Finding Azure endpoint from networkd...',
+ logger_func=LOG.debug)
value = WALinuxAgentShim._networkd_get_value_from_leases()
if value is None:
# Option-245 stored in /run/cloud-init/dhclient.hooks/<ifc>.json
# a dhclient exit hook that calls cloud-init-dhclient-hook
- report_diagnostic_event("No Azure endpoint from networkd")
- LOG.debug('Finding Azure endpoint from hook json...')
+ report_diagnostic_event(
+ 'No Azure endpoint from networkd. '
+ 'Finding Azure endpoint from hook json...',
+ logger_func=LOG.debug)
dhcp_options = WALinuxAgentShim._load_dhclient_json()
value = WALinuxAgentShim._get_value_from_dhcpoptions(dhcp_options)
if value is None:
# Fallback and check the leases file if unsuccessful
- report_diagnostic_event("No Azure endpoint from dhclient logs")
- LOG.debug("Unable to find endpoint in dhclient logs. "
- " Falling back to check lease files")
+ report_diagnostic_event(
+ 'No Azure endpoint from dhclient logs. '
+ 'Unable to find endpoint in dhclient logs. '
+ 'Falling back to check lease files',
+ logger_func=LOG.debug)
if fallback_lease_file is None:
- LOG.warning("No fallback lease file was specified.")
+ report_diagnostic_event(
+ 'No fallback lease file was specified.',
+ logger_func=LOG.warning)
value = None
else:
- LOG.debug("Looking for endpoint in lease file %s",
- fallback_lease_file)
+ report_diagnostic_event(
+ 'Looking for endpoint in lease file %s'
+ % fallback_lease_file, logger_func=LOG.debug)
value = WALinuxAgentShim._get_value_from_leases_file(
fallback_lease_file)
if value is None:
- msg = "No lease found; using default endpoint"
- report_diagnostic_event(msg)
- LOG.warning(msg)
value = DEFAULT_WIRESERVER_ENDPOINT
+ report_diagnostic_event(
+ 'No lease found; using default endpoint: %s' % value,
+ logger_func=LOG.warning)
endpoint_ip_address = WALinuxAgentShim.get_ip_from_lease_value(value)
- msg = 'Azure endpoint found at %s' % endpoint_ip_address
- report_diagnostic_event(msg)
- LOG.debug(msg)
+ report_diagnostic_event(
+ 'Azure endpoint found at %s' % endpoint_ip_address,
+ logger_func=LOG.debug)
return endpoint_ip_address
@azure_ds_telemetry_reporter
- def register_with_azure_and_fetch_data(self, pubkey_info=None):
+ def register_with_azure_and_fetch_data(self, pubkey_info=None) -> dict:
"""Gets the VM's GoalState from Azure, uses the GoalState information
to report ready/send the ready signal/provisioning complete signal to
Azure, and then uses pubkey_info to filter and obtain the user's
@@ -737,30 +874,56 @@ class WALinuxAgentShim:
GoalState.
@return: The list of user's authorized pubkey values.
"""
- if self.openssl_manager is None:
+ http_client_certificate = None
+ if self.openssl_manager is None and pubkey_info is not None:
self.openssl_manager = OpenSSLManager()
+ http_client_certificate = self.openssl_manager.certificate
if self.azure_endpoint_client is None:
self.azure_endpoint_client = AzureEndpointHttpClient(
- self.openssl_manager.certificate)
- goal_state = self._fetch_goal_state_from_azure()
- ssh_keys = self._get_user_pubkeys(goal_state, pubkey_info)
+ http_client_certificate)
+ goal_state = self._fetch_goal_state_from_azure(
+ need_certificate=http_client_certificate is not None
+ )
+ ssh_keys = None
+ if pubkey_info is not None:
+ ssh_keys = self._get_user_pubkeys(goal_state, pubkey_info)
health_reporter = GoalStateHealthReporter(
goal_state, self.azure_endpoint_client, self.endpoint)
health_reporter.send_ready_signal()
return {'public-keys': ssh_keys}
@azure_ds_telemetry_reporter
- def _fetch_goal_state_from_azure(self):
+ def register_with_azure_and_report_failure(self, description: str) -> None:
+ """Gets the VM's GoalState from Azure, uses the GoalState information
+ to report failure/send provisioning failure signal to Azure.
+
+ @param: user visible error description of provisioning failure.
+ """
+ if self.azure_endpoint_client is None:
+ self.azure_endpoint_client = AzureEndpointHttpClient(None)
+ goal_state = self._fetch_goal_state_from_azure(need_certificate=False)
+ health_reporter = GoalStateHealthReporter(
+ goal_state, self.azure_endpoint_client, self.endpoint)
+ health_reporter.send_failure_signal(description=description)
+
+ @azure_ds_telemetry_reporter
+ def _fetch_goal_state_from_azure(
+ self,
+ need_certificate: bool) -> GoalState:
"""Fetches the GoalState XML from the Azure endpoint, parses the XML,
and returns a GoalState object.
+ @param need_certificate: switch to know if certificates is needed.
@return: GoalState object representing the GoalState XML
"""
unparsed_goal_state_xml = self._get_raw_goal_state_xml_from_azure()
- return self._parse_raw_goal_state_xml(unparsed_goal_state_xml)
+ return self._parse_raw_goal_state_xml(
+ unparsed_goal_state_xml,
+ need_certificate
+ )
@azure_ds_telemetry_reporter
- def _get_raw_goal_state_xml_from_azure(self):
+ def _get_raw_goal_state_xml_from_azure(self) -> str:
"""Fetches the GoalState XML from the Azure endpoint and returns
the XML as a string.
@@ -770,40 +933,51 @@ class WALinuxAgentShim:
LOG.info('Registering with Azure...')
url = 'http://{}/machine/?comp=goalstate'.format(self.endpoint)
try:
- response = self.azure_endpoint_client.get(url)
+ with events.ReportEventStack(
+ name="goalstate-retrieval",
+ description="retrieve goalstate",
+ parent=azure_ds_reporter):
+ response = self.azure_endpoint_client.get(url)
except Exception as e:
- msg = 'failed to register with Azure: %s' % e
- LOG.warning(msg)
- report_diagnostic_event(msg)
+ report_diagnostic_event(
+ 'failed to register with Azure and fetch GoalState XML: %s'
+ % e, logger_func=LOG.warning)
raise
LOG.debug('Successfully fetched GoalState XML.')
return response.contents
@azure_ds_telemetry_reporter
- def _parse_raw_goal_state_xml(self, unparsed_goal_state_xml):
+ def _parse_raw_goal_state_xml(
+ self,
+ unparsed_goal_state_xml: str,
+ need_certificate: bool) -> GoalState:
"""Parses a GoalState XML string and returns a GoalState object.
@param unparsed_goal_state_xml: GoalState XML string
+ @param need_certificate: switch to know if certificates is needed.
@return: GoalState object representing the GoalState XML
"""
try:
goal_state = GoalState(
- unparsed_goal_state_xml, self.azure_endpoint_client)
+ unparsed_goal_state_xml,
+ self.azure_endpoint_client,
+ need_certificate
+ )
except Exception as e:
- msg = 'Error processing GoalState XML: %s' % e
- LOG.warning(msg)
- report_diagnostic_event(msg)
+ report_diagnostic_event(
+ 'Error processing GoalState XML: %s' % e,
+ logger_func=LOG.warning)
raise
msg = ', '.join([
'GoalState XML container id: %s' % goal_state.container_id,
'GoalState XML instance id: %s' % goal_state.instance_id,
'GoalState XML incarnation: %s' % goal_state.incarnation])
- LOG.debug(msg)
- report_diagnostic_event(msg)
+ report_diagnostic_event(msg, logger_func=LOG.debug)
return goal_state
@azure_ds_telemetry_reporter
- def _get_user_pubkeys(self, goal_state, pubkey_info):
+ def _get_user_pubkeys(
+ self, goal_state: GoalState, pubkey_info: list) -> list:
"""Gets and filters the VM admin user's authorized pubkeys.
The admin user in this case is the username specified as "admin"
@@ -838,7 +1012,7 @@ class WALinuxAgentShim:
return ssh_keys
@staticmethod
- def _filter_pubkeys(keys_by_fingerprint, pubkey_info):
+ def _filter_pubkeys(keys_by_fingerprint: dict, pubkey_info: list) -> list:
""" Filter and return only the user's actual pubkeys.
@param keys_by_fingerprint: pubkey fingerprint -> pubkey value dict
@@ -879,9 +1053,25 @@ def get_metadata_from_fabric(fallback_lease_file=None, dhcp_opts=None,
shim.clean_up()
+@azure_ds_telemetry_reporter
+def report_failure_to_fabric(fallback_lease_file=None, dhcp_opts=None,
+ description=None):
+ shim = WALinuxAgentShim(fallback_lease_file=fallback_lease_file,
+ dhcp_options=dhcp_opts)
+ if not description:
+ description = DEFAULT_REPORT_FAILURE_USER_VISIBLE_MESSAGE
+ try:
+ shim.register_with_azure_and_report_failure(
+ description=description)
+ finally:
+ shim.clean_up()
+
+
def dhcp_log_cb(out, err):
- report_diagnostic_event("dhclient output stream: %s" % out)
- report_diagnostic_event("dhclient error stream: %s" % err)
+ report_diagnostic_event(
+ "dhclient output stream: %s" % out, logger_func=LOG.debug)
+ report_diagnostic_event(
+ "dhclient error stream: %s" % err, logger_func=LOG.debug)
class EphemeralDHCPv4WithReporting: