From 86674f013dfcea3c075ab41373ffb475881066f6 Mon Sep 17 00:00:00 2001 From: Anh Vo Date: Mon, 29 Apr 2019 20:22:16 +0000 Subject: Azure: Changes to the Hyper-V KVP Reporter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit  + Truncate KVP Pool file to prevent stale entries from being processed by the Hyper-V KVP reporter.  + Drop filtering of KVPs as it is no longer needed.  + Batch appending of existing KVP entries. --- tests/unittests/test_reporting_hyperv.py | 104 +++++++++++++++---------------- 1 file changed, 49 insertions(+), 55 deletions(-) mode change 100644 => 100755 tests/unittests/test_reporting_hyperv.py (limited to 'tests/unittests/test_reporting_hyperv.py') diff --git a/tests/unittests/test_reporting_hyperv.py b/tests/unittests/test_reporting_hyperv.py old mode 100644 new mode 100755 index 2e64c6c7..d01ed5b3 --- a/tests/unittests/test_reporting_hyperv.py +++ b/tests/unittests/test_reporting_hyperv.py @@ -1,10 +1,12 @@ # This file is part of cloud-init. See LICENSE file for license information. from cloudinit.reporting import events -from cloudinit.reporting import handlers +from cloudinit.reporting.handlers import HyperVKvpReportingHandler import json import os +import struct +import time from cloudinit import util from cloudinit.tests.helpers import CiTestCase @@ -13,7 +15,7 @@ from cloudinit.tests.helpers import CiTestCase class TestKvpEncoding(CiTestCase): def test_encode_decode(self): kvp = {'key': 'key1', 'value': 'value1'} - kvp_reporting = handlers.HyperVKvpReportingHandler() + kvp_reporting = HyperVKvpReportingHandler() data = kvp_reporting._encode_kvp_item(kvp['key'], kvp['value']) self.assertEqual(len(data), kvp_reporting.HV_KVP_RECORD_SIZE) decoded_kvp = kvp_reporting._decode_kvp_item(data) @@ -26,57 +28,9 @@ class TextKvpReporter(CiTestCase): self.tmp_file_path = self.tmp_path('kvp_pool_file') util.ensure_file(self.tmp_file_path) - def test_event_type_can_be_filtered(self): - reporter = handlers.HyperVKvpReportingHandler( - kvp_file_path=self.tmp_file_path, - event_types=['foo', 'bar']) - - reporter.publish_event( - events.ReportingEvent('foo', 'name', 'description')) - reporter.publish_event( - events.ReportingEvent('some_other', 'name', 'description3')) - reporter.q.join() - - kvps = list(reporter._iterate_kvps(0)) - self.assertEqual(1, len(kvps)) - - reporter.publish_event( - events.ReportingEvent('bar', 'name', 'description2')) - reporter.q.join() - kvps = list(reporter._iterate_kvps(0)) - self.assertEqual(2, len(kvps)) - - self.assertIn('foo', kvps[0]['key']) - self.assertIn('bar', kvps[1]['key']) - self.assertNotIn('some_other', kvps[0]['key']) - self.assertNotIn('some_other', kvps[1]['key']) - - def test_events_are_over_written(self): - reporter = handlers.HyperVKvpReportingHandler( - kvp_file_path=self.tmp_file_path) - - self.assertEqual(0, len(list(reporter._iterate_kvps(0)))) - - reporter.publish_event( - events.ReportingEvent('foo', 'name1', 'description')) - reporter.publish_event( - events.ReportingEvent('foo', 'name2', 'description')) - reporter.q.join() - self.assertEqual(2, len(list(reporter._iterate_kvps(0)))) - - reporter2 = handlers.HyperVKvpReportingHandler( - kvp_file_path=self.tmp_file_path) - reporter2.incarnation_no = reporter.incarnation_no + 1 - reporter2.publish_event( - events.ReportingEvent('foo', 'name3', 'description')) - reporter2.q.join() - - self.assertEqual(2, len(list(reporter2._iterate_kvps(0)))) - def test_events_with_higher_incarnation_not_over_written(self): - reporter = handlers.HyperVKvpReportingHandler( + reporter = HyperVKvpReportingHandler( kvp_file_path=self.tmp_file_path) - self.assertEqual(0, len(list(reporter._iterate_kvps(0)))) reporter.publish_event( @@ -86,7 +40,7 @@ class TextKvpReporter(CiTestCase): reporter.q.join() self.assertEqual(2, len(list(reporter._iterate_kvps(0)))) - reporter3 = handlers.HyperVKvpReportingHandler( + reporter3 = HyperVKvpReportingHandler( kvp_file_path=self.tmp_file_path) reporter3.incarnation_no = reporter.incarnation_no - 1 reporter3.publish_event( @@ -95,7 +49,7 @@ class TextKvpReporter(CiTestCase): self.assertEqual(3, len(list(reporter3._iterate_kvps(0)))) def test_finish_event_result_is_logged(self): - reporter = handlers.HyperVKvpReportingHandler( + reporter = HyperVKvpReportingHandler( kvp_file_path=self.tmp_file_path) reporter.publish_event( events.FinishReportingEvent('name2', 'description1', @@ -105,7 +59,7 @@ class TextKvpReporter(CiTestCase): def test_file_operation_issue(self): os.remove(self.tmp_file_path) - reporter = handlers.HyperVKvpReportingHandler( + reporter = HyperVKvpReportingHandler( kvp_file_path=self.tmp_file_path) reporter.publish_event( events.FinishReportingEvent('name2', 'description1', @@ -113,7 +67,7 @@ class TextKvpReporter(CiTestCase): reporter.q.join() def test_event_very_long(self): - reporter = handlers.HyperVKvpReportingHandler( + reporter = HyperVKvpReportingHandler( kvp_file_path=self.tmp_file_path) description = 'ab' * reporter.HV_KVP_EXCHANGE_MAX_VALUE_SIZE long_event = events.FinishReportingEvent( @@ -132,3 +86,43 @@ class TextKvpReporter(CiTestCase): self.assertEqual(msg_slice['msg_i'], i) full_description += msg_slice['msg'] self.assertEqual(description, full_description) + + def test_not_truncate_kvp_file_modified_after_boot(self): + with open(self.tmp_file_path, "wb+") as f: + kvp = {'key': 'key1', 'value': 'value1'} + data = (struct.pack("%ds%ds" % ( + HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_KEY_SIZE, + HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_VALUE_SIZE), + kvp['key'].encode('utf-8'), kvp['value'].encode('utf-8'))) + f.write(data) + cur_time = time.time() + os.utime(self.tmp_file_path, (cur_time, cur_time)) + + # reset this because the unit test framework + # has already polluted the class variable + HyperVKvpReportingHandler._already_truncated_pool_file = False + + reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path) + kvps = list(reporter._iterate_kvps(0)) + self.assertEqual(1, len(kvps)) + + def test_truncate_stale_kvp_file(self): + with open(self.tmp_file_path, "wb+") as f: + kvp = {'key': 'key1', 'value': 'value1'} + data = (struct.pack("%ds%ds" % ( + HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_KEY_SIZE, + HyperVKvpReportingHandler.HV_KVP_EXCHANGE_MAX_VALUE_SIZE), + kvp['key'].encode('utf-8'), kvp['value'].encode('utf-8'))) + f.write(data) + + # set the time ways back to make it look like + # we had an old kvp file + os.utime(self.tmp_file_path, (1000000, 1000000)) + + # reset this because the unit test framework + # has already polluted the class variable + HyperVKvpReportingHandler._already_truncated_pool_file = False + + reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path) + kvps = list(reporter._iterate_kvps(0)) + self.assertEqual(0, len(kvps)) -- cgit v1.2.3 From 2f3bb764626b9065f4102c7c0a67998a9c174444 Mon Sep 17 00:00:00 2001 From: Anh Vo Date: Wed, 14 Aug 2019 21:03:13 +0000 Subject: Azure: Record boot timestamps, system information, and diagnostic events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Collect and record the following information through KVP:  + timestamps related to kernel initialization and systemd activation    of cloud-init services  + system information including cloud-init version, kernel version,    distro version, and python version  + diagnostic events for the most common provisioning error issues    such as empty dhcp lease, corrupted ovf-env.xml, etc. + increasing the log frequency of polling IMDS during reprovision. --- cloudinit/sources/DataSourceAzure.py | 157 ++++++++++++++++++++----- cloudinit/sources/helpers/azure.py | 160 ++++++++++++++++++++++++-- tests/unittests/test_datasource/test_azure.py | 15 ++- tests/unittests/test_reporting_hyperv.py | 65 +++++++++++ 4 files changed, 353 insertions(+), 44 deletions(-) mode change 100755 => 100644 tests/unittests/test_reporting_hyperv.py (limited to 'tests/unittests/test_reporting_hyperv.py') diff --git a/cloudinit/sources/DataSourceAzure.py b/cloudinit/sources/DataSourceAzure.py index e6ed2f3b..4984fa84 100755 --- a/cloudinit/sources/DataSourceAzure.py +++ b/cloudinit/sources/DataSourceAzure.py @@ -26,9 +26,14 @@ from cloudinit.url_helper import UrlError, readurl, retry_on_url_exc from cloudinit import util from cloudinit.reporting import events -from cloudinit.sources.helpers.azure import (azure_ds_reporter, - azure_ds_telemetry_reporter, - get_metadata_from_fabric) +from cloudinit.sources.helpers.azure import ( + azure_ds_reporter, + azure_ds_telemetry_reporter, + get_metadata_from_fabric, + get_boot_telemetry, + get_system_info, + report_diagnostic_event, + EphemeralDHCPv4WithReporting) LOG = logging.getLogger(__name__) @@ -354,7 +359,7 @@ class DataSourceAzure(sources.DataSource): bname = str(pk['fingerprint'] + ".crt") fp_files += [os.path.join(ddir, bname)] LOG.debug("ssh authentication: " - "using fingerprint from fabirc") + "using fingerprint from fabric") with events.ReportEventStack( name="waiting-for-ssh-public-key", @@ -419,12 +424,17 @@ class DataSourceAzure(sources.DataSource): ret = load_azure_ds_dir(cdev) except NonAzureDataSource: + report_diagnostic_event( + "Did not find Azure data source in %s" % cdev) continue except BrokenAzureDataSource as exc: msg = 'BrokenAzureDataSource: %s' % exc + report_diagnostic_event(msg) raise sources.InvalidMetaDataException(msg) except util.MountFailedError: - LOG.warning("%s was not mountable", cdev) + msg = '%s was not mountable' % cdev + report_diagnostic_event(msg) + LOG.warning(msg) continue perform_reprovision = reprovision or self._should_reprovision(ret) @@ -432,6 +442,7 @@ class DataSourceAzure(sources.DataSource): if util.is_FreeBSD(): msg = "Free BSD is not supported for PPS VMs" LOG.error(msg) + report_diagnostic_event(msg) raise sources.InvalidMetaDataException(msg) ret = self._reprovision() imds_md = get_metadata_from_imds( @@ -450,7 +461,9 @@ class DataSourceAzure(sources.DataSource): break if not found: - raise sources.InvalidMetaDataException('No Azure metadata found') + msg = 'No Azure metadata found' + report_diagnostic_event(msg) + raise sources.InvalidMetaDataException(msg) if found == ddir: LOG.debug("using files cached in %s", ddir) @@ -469,9 +482,14 @@ class DataSourceAzure(sources.DataSource): self._report_ready(lease=self._ephemeral_dhcp_ctx.lease) self._ephemeral_dhcp_ctx.clean_network() # Teardown ephemeral else: - with EphemeralDHCPv4() as lease: - self._report_ready(lease=lease) - + try: + with EphemeralDHCPv4WithReporting( + azure_ds_reporter) as lease: + self._report_ready(lease=lease) + except Exception as e: + report_diagnostic_event( + "exception while reporting ready: %s" % e) + raise return crawled_data def _is_platform_viable(self): @@ -492,6 +510,16 @@ class DataSourceAzure(sources.DataSource): """ if not self._is_platform_viable(): return False + try: + get_boot_telemetry() + except Exception as e: + LOG.warning("Failed to get boot telemetry: %s", e) + + try: + get_system_info() + except Exception as e: + LOG.warning("Failed to get system information: %s", e) + try: crawled_data = util.log_time( logfunc=LOG.debug, msg='Crawl of metadata service', @@ -551,27 +579,55 @@ class DataSourceAzure(sources.DataSource): headers = {"Metadata": "true"} nl_sock = None report_ready = bool(not os.path.isfile(REPORTED_READY_MARKER_FILE)) + self.imds_logging_threshold = 1 + self.imds_poll_counter = 1 + dhcp_attempts = 0 + vnet_switched = False + return_val = None def exc_cb(msg, exception): if isinstance(exception, UrlError) and exception.code == 404: + if self.imds_poll_counter == self.imds_logging_threshold: + # Reducing the logging frequency as we are polling IMDS + self.imds_logging_threshold *= 2 + LOG.debug("Call to IMDS with arguments %s failed " + "with status code %s after %s retries", + msg, exception.code, self.imds_poll_counter) + LOG.debug("Backing off logging threshold for the same " + "exception to %d", self.imds_logging_threshold) + self.imds_poll_counter += 1 return True + # If we get an exception while trying to call IMDS, we # call DHCP and setup the ephemeral network to acquire the new IP. + LOG.debug("Call to IMDS with arguments %s failed with " + "status code %s", msg, exception.code) + report_diagnostic_event("polling IMDS failed with exception %s" + % exception.code) return False LOG.debug("Wait for vnetswitch to happen") while True: try: - # Save our EphemeralDHCPv4 context so we avoid repeated dhcp - self._ephemeral_dhcp_ctx = EphemeralDHCPv4() - lease = self._ephemeral_dhcp_ctx.obtain_lease() + # Save our EphemeralDHCPv4 context to avoid repeated dhcp + with events.ReportEventStack( + name="obtain-dhcp-lease", + description="obtain dhcp lease", + parent=azure_ds_reporter): + self._ephemeral_dhcp_ctx = EphemeralDHCPv4() + lease = self._ephemeral_dhcp_ctx.obtain_lease() + + if vnet_switched: + dhcp_attempts += 1 if report_ready: try: nl_sock = netlink.create_bound_netlink_socket() except netlink.NetlinkCreateSocketError as e: + report_diagnostic_event(e) LOG.warning(e) self._ephemeral_dhcp_ctx.clean_network() - return + break + path = REPORTED_READY_MARKER_FILE LOG.info( "Creating a marker file to report ready: %s", path) @@ -579,17 +635,33 @@ class DataSourceAzure(sources.DataSource): pid=os.getpid(), time=time())) self._report_ready(lease=lease) report_ready = False - try: - netlink.wait_for_media_disconnect_connect( - nl_sock, lease['interface']) - except AssertionError as error: - LOG.error(error) - return + + with events.ReportEventStack( + name="wait-for-media-disconnect-connect", + description="wait for vnet switch", + parent=azure_ds_reporter): + try: + netlink.wait_for_media_disconnect_connect( + nl_sock, lease['interface']) + except AssertionError as error: + report_diagnostic_event(error) + LOG.error(error) + break + + vnet_switched = True self._ephemeral_dhcp_ctx.clean_network() else: - return readurl(url, timeout=IMDS_TIMEOUT_IN_SECONDS, - headers=headers, exception_cb=exc_cb, - infinite=True, log_req_resp=False).contents + with events.ReportEventStack( + name="get-reprovision-data-from-imds", + description="get reprovision data from imds", + parent=azure_ds_reporter): + return_val = readurl(url, + timeout=IMDS_TIMEOUT_IN_SECONDS, + headers=headers, + exception_cb=exc_cb, + infinite=True, + log_req_resp=False).contents + break except UrlError: # Teardown our EphemeralDHCPv4 context on failure as we retry self._ephemeral_dhcp_ctx.clean_network() @@ -598,6 +670,14 @@ class DataSourceAzure(sources.DataSource): if nl_sock: nl_sock.close() + if vnet_switched: + report_diagnostic_event("attempted dhcp %d times after reuse" % + dhcp_attempts) + report_diagnostic_event("polled imds %d times after reuse" % + self.imds_poll_counter) + + return return_val + @azure_ds_telemetry_reporter def _report_ready(self, lease): """Tells the fabric provisioning has completed """ @@ -666,9 +746,12 @@ class DataSourceAzure(sources.DataSource): self.ds_cfg['agent_command']) try: fabric_data = metadata_func() - except Exception: + except Exception as e: + report_diagnostic_event( + "Error communicating with Azure fabric; You may experience " + "connectivity issues: %s" % e) LOG.warning( - "Error communicating with Azure fabric; You may experience." + "Error communicating with Azure fabric; You may experience " "connectivity issues.", exc_info=True) return False @@ -1027,7 +1110,9 @@ def read_azure_ovf(contents): try: dom = minidom.parseString(contents) except Exception as e: - raise BrokenAzureDataSource("Invalid ovf-env.xml: %s" % e) + error_str = "Invalid ovf-env.xml: %s" % e + report_diagnostic_event(error_str) + raise BrokenAzureDataSource(error_str) results = find_child(dom.documentElement, lambda n: n.localName == "ProvisioningSection") @@ -1299,8 +1384,13 @@ def get_metadata_from_imds(fallback_nic, retries): if net.is_up(fallback_nic): return util.log_time(**kwargs) else: - with EphemeralDHCPv4(fallback_nic): - return util.log_time(**kwargs) + try: + with EphemeralDHCPv4WithReporting( + azure_ds_reporter, fallback_nic): + return util.log_time(**kwargs) + except Exception as e: + report_diagnostic_event("exception while getting metadata: %s" % e) + raise @azure_ds_telemetry_reporter @@ -1313,11 +1403,14 @@ def _get_metadata_from_imds(retries): url, timeout=IMDS_TIMEOUT_IN_SECONDS, headers=headers, retries=retries, exception_cb=retry_on_url_exc) except Exception as e: - LOG.debug('Ignoring IMDS instance metadata: %s', e) + msg = 'Ignoring IMDS instance metadata: %s' % e + report_diagnostic_event(msg) + LOG.debug(msg) return {} try: return util.load_json(str(response)) - except json.decoder.JSONDecodeError: + except json.decoder.JSONDecodeError as e: + report_diagnostic_event('non-json imds response' % e) LOG.warning( 'Ignoring non-json IMDS instance metadata: %s', str(response)) return {} @@ -1370,8 +1463,10 @@ def _is_platform_viable(seed_dir): asset_tag = util.read_dmi_data('chassis-asset-tag') if asset_tag == AZURE_CHASSIS_ASSET_TAG: return True - LOG.debug("Non-Azure DMI asset tag '%s' discovered.", asset_tag) - evt.description = "Non-Azure DMI asset tag '%s' discovered.", asset_tag + msg = "Non-Azure DMI asset tag '%s' discovered." % asset_tag + LOG.debug(msg) + evt.description = msg + report_diagnostic_event(msg) if os.path.exists(os.path.join(seed_dir, 'ovf-env.xml')): return True return False diff --git a/cloudinit/sources/helpers/azure.py b/cloudinit/sources/helpers/azure.py index 82c4c8c4..f1fba175 100755 --- a/cloudinit/sources/helpers/azure.py +++ b/cloudinit/sources/helpers/azure.py @@ -16,7 +16,11 @@ from xml.etree import ElementTree from cloudinit import url_helper from cloudinit import util +from cloudinit import version +from cloudinit import distros from cloudinit.reporting import events +from cloudinit.net.dhcp import EphemeralDHCPv4 +from datetime import datetime LOG = logging.getLogger(__name__) @@ -24,6 +28,10 @@ LOG = logging.getLogger(__name__) # value is applied if the endpoint can't be found within a lease file DEFAULT_WIRESERVER_ENDPOINT = "a8:3f:81:10" +BOOT_EVENT_TYPE = 'boot-telemetry' +SYSTEMINFO_EVENT_TYPE = 'system-info' +DIAGNOSTIC_EVENT_TYPE = 'diagnostic' + azure_ds_reporter = events.ReportEventStack( name="azure-ds", description="initialize reporter for azure ds", @@ -40,6 +48,105 @@ def azure_ds_telemetry_reporter(func): return impl +@azure_ds_telemetry_reporter +def get_boot_telemetry(): + """Report timestamps related to kernel initialization and systemd + activation of cloud-init""" + if not distros.uses_systemd(): + raise RuntimeError( + "distro not using systemd, skipping boot telemetry") + + LOG.debug("Collecting boot telemetry") + try: + kernel_start = float(time.time()) - float(util.uptime()) + except ValueError: + raise RuntimeError("Failed to determine kernel start timestamp") + + try: + out, _ = util.subp(['/bin/systemctl', + 'show', '-p', + 'UserspaceTimestampMonotonic'], + capture=True) + tsm = None + if out and '=' in out: + tsm = out.split("=")[1] + + if not tsm: + raise RuntimeError("Failed to parse " + "UserspaceTimestampMonotonic from systemd") + + user_start = kernel_start + (float(tsm) / 1000000) + except util.ProcessExecutionError as e: + raise RuntimeError("Failed to get UserspaceTimestampMonotonic: %s" + % e) + except ValueError as e: + raise RuntimeError("Failed to parse " + "UserspaceTimestampMonotonic from systemd: %s" + % e) + + try: + out, _ = util.subp(['/bin/systemctl', 'show', + 'cloud-init-local', '-p', + 'InactiveExitTimestampMonotonic'], + capture=True) + tsm = None + if out and '=' in out: + tsm = out.split("=")[1] + if not tsm: + raise RuntimeError("Failed to parse " + "InactiveExitTimestampMonotonic from systemd") + + cloudinit_activation = kernel_start + (float(tsm) / 1000000) + except util.ProcessExecutionError as e: + raise RuntimeError("Failed to get InactiveExitTimestampMonotonic: %s" + % e) + except ValueError as e: + raise RuntimeError("Failed to parse " + "InactiveExitTimestampMonotonic from systemd: %s" + % e) + + evt = events.ReportingEvent( + BOOT_EVENT_TYPE, 'boot-telemetry', + "kernel_start=%s user_start=%s cloudinit_activation=%s" % + (datetime.utcfromtimestamp(kernel_start).isoformat() + 'Z', + datetime.utcfromtimestamp(user_start).isoformat() + 'Z', + datetime.utcfromtimestamp(cloudinit_activation).isoformat() + 'Z'), + events.DEFAULT_EVENT_ORIGIN) + events.report_event(evt) + + # return the event for unit testing purpose + return evt + + +@azure_ds_telemetry_reporter +def get_system_info(): + """Collect and report system information""" + info = util.system_info() + evt = events.ReportingEvent( + SYSTEMINFO_EVENT_TYPE, 'system information', + "cloudinit_version=%s, kernel_version=%s, variant=%s, " + "distro_name=%s, distro_version=%s, flavor=%s, " + "python_version=%s" % + (version.version_string(), info['release'], info['variant'], + info['dist'][0], info['dist'][1], info['dist'][2], + info['python']), events.DEFAULT_EVENT_ORIGIN) + events.report_event(evt) + + # return the event for unit testing purpose + return evt + + +def report_diagnostic_event(str): + """Report a diagnostic event""" + evt = events.ReportingEvent( + DIAGNOSTIC_EVENT_TYPE, 'diagnostic message', + str, events.DEFAULT_EVENT_ORIGIN) + events.report_event(evt) + + # return the event for unit testing purpose + return evt + + @contextmanager def cd(newdir): prevdir = os.getcwd() @@ -360,16 +467,19 @@ class WALinuxAgentShim(object): value = dhcp245 LOG.debug("Using Azure Endpoint from dhcp options") if value is None: + report_diagnostic_event("No Azure endpoint from dhcp options") LOG.debug('Finding Azure endpoint from networkd...') value = WALinuxAgentShim._networkd_get_value_from_leases() if value is None: # Option-245 stored in /run/cloud-init/dhclient.hooks/.json # a dhclient exit hook that calls cloud-init-dhclient-hook + report_diagnostic_event("No Azure endpoint from networkd") LOG.debug('Finding Azure endpoint from hook json...') dhcp_options = WALinuxAgentShim._load_dhclient_json() value = WALinuxAgentShim._get_value_from_dhcpoptions(dhcp_options) if value is None: # Fallback and check the leases file if unsuccessful + report_diagnostic_event("No Azure endpoint from dhclient logs") LOG.debug("Unable to find endpoint in dhclient logs. " " Falling back to check lease files") if fallback_lease_file is None: @@ -381,11 +491,15 @@ class WALinuxAgentShim(object): value = WALinuxAgentShim._get_value_from_leases_file( fallback_lease_file) if value is None: - LOG.warning("No lease found; using default endpoint") + msg = "No lease found; using default endpoint" + report_diagnostic_event(msg) + LOG.warning(msg) value = DEFAULT_WIRESERVER_ENDPOINT endpoint_ip_address = WALinuxAgentShim.get_ip_from_lease_value(value) - LOG.debug('Azure endpoint found at %s', endpoint_ip_address) + msg = 'Azure endpoint found at %s' % endpoint_ip_address + report_diagnostic_event(msg) + LOG.debug(msg) return endpoint_ip_address @azure_ds_telemetry_reporter @@ -399,16 +513,19 @@ class WALinuxAgentShim(object): try: response = http_client.get( 'http://{0}/machine/?comp=goalstate'.format(self.endpoint)) - except Exception: + except Exception as e: if attempts < 10: time.sleep(attempts + 1) else: + report_diagnostic_event( + "failed to register with Azure: %s" % e) raise else: break attempts += 1 LOG.debug('Successfully fetched GoalState XML.') goal_state = GoalState(response.contents, http_client) + report_diagnostic_event("container_id %s" % goal_state.container_id) ssh_keys = [] if goal_state.certificates_xml is not None and pubkey_info is not None: LOG.debug('Certificate XML found; parsing out public keys.') @@ -449,11 +566,20 @@ class WALinuxAgentShim(object): container_id=goal_state.container_id, instance_id=goal_state.instance_id, ) - http_client.post( - "http://{0}/machine?comp=health".format(self.endpoint), - data=document, - extra_headers={'Content-Type': 'text/xml; charset=utf-8'}, - ) + # Host will collect kvps when cloud-init reports ready. + # some kvps might still be in the queue. We yield the scheduler + # to make sure we process all kvps up till this point. + time.sleep(0) + try: + http_client.post( + "http://{0}/machine?comp=health".format(self.endpoint), + data=document, + extra_headers={'Content-Type': 'text/xml; charset=utf-8'}, + ) + except Exception as e: + report_diagnostic_event("exception while reporting ready: %s" % e) + raise + LOG.info('Reported ready to Azure fabric.') @@ -467,4 +593,22 @@ def get_metadata_from_fabric(fallback_lease_file=None, dhcp_opts=None, finally: shim.clean_up() + +class EphemeralDHCPv4WithReporting(object): + def __init__(self, reporter, nic=None): + self.reporter = reporter + self.ephemeralDHCPv4 = EphemeralDHCPv4(iface=nic) + + def __enter__(self): + with events.ReportEventStack( + name="obtain-dhcp-lease", + description="obtain dhcp lease", + parent=self.reporter): + return self.ephemeralDHCPv4.__enter__() + + def __exit__(self, excp_type, excp_value, excp_traceback): + self.ephemeralDHCPv4.__exit__( + excp_type, excp_value, excp_traceback) + + # vi: ts=4 expandtab diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py index 4d57cebc..3547dd94 100644 --- a/tests/unittests/test_datasource/test_azure.py +++ b/tests/unittests/test_datasource/test_azure.py @@ -181,7 +181,7 @@ class TestGetMetadataFromIMDS(HttprettyTestCase): self.logs.getvalue()) @mock.patch(MOCKPATH + 'readurl') - @mock.patch(MOCKPATH + 'EphemeralDHCPv4') + @mock.patch(MOCKPATH + 'EphemeralDHCPv4WithReporting') @mock.patch(MOCKPATH + 'net.is_up') def test_get_metadata_performs_dhcp_when_network_is_down( self, m_net_is_up, m_dhcp, m_readurl): @@ -195,7 +195,7 @@ class TestGetMetadataFromIMDS(HttprettyTestCase): dsaz.get_metadata_from_imds('eth9', retries=2)) m_net_is_up.assert_called_with('eth9') - m_dhcp.assert_called_with('eth9') + m_dhcp.assert_called_with(mock.ANY, 'eth9') self.assertIn( "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time self.logs.getvalue()) @@ -552,7 +552,8 @@ scbus-1 on xpt0 bus 0 dsrc.crawl_metadata() self.assertEqual(str(cm.exception), error_msg) - @mock.patch('cloudinit.sources.DataSourceAzure.EphemeralDHCPv4') + @mock.patch( + 'cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting') @mock.patch('cloudinit.sources.DataSourceAzure.util.write_file') @mock.patch( 'cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready') @@ -1308,7 +1309,9 @@ class TestAzureBounce(CiTestCase): self.assertEqual(initial_host_name, self.set_hostname.call_args_list[-1][0][0]) - def test_environment_correct_for_bounce_command(self): + @mock.patch.object(dsaz, 'get_boot_telemetry') + def test_environment_correct_for_bounce_command( + self, mock_get_boot_telemetry): interface = 'int0' hostname = 'my-new-host' old_hostname = 'my-old-host' @@ -1324,7 +1327,9 @@ class TestAzureBounce(CiTestCase): self.assertEqual(hostname, bounce_env['hostname']) self.assertEqual(old_hostname, bounce_env['old_hostname']) - def test_default_bounce_command_ifup_used_by_default(self): + @mock.patch.object(dsaz, 'get_boot_telemetry') + def test_default_bounce_command_ifup_used_by_default( + self, mock_get_boot_telemetry): cfg = {'hostname_bounce': {'policy': 'force'}} data = self.get_ovf_env_with_dscfg('some-hostname', cfg) dsrc = self._get_ds(data, agent_command=['not', '__builtin__']) diff --git a/tests/unittests/test_reporting_hyperv.py b/tests/unittests/test_reporting_hyperv.py old mode 100755 new mode 100644 index d01ed5b3..640895a4 --- a/tests/unittests/test_reporting_hyperv.py +++ b/tests/unittests/test_reporting_hyperv.py @@ -7,9 +7,12 @@ import json import os import struct import time +import re +import mock from cloudinit import util from cloudinit.tests.helpers import CiTestCase +from cloudinit.sources.helpers import azure class TestKvpEncoding(CiTestCase): @@ -126,3 +129,65 @@ class TextKvpReporter(CiTestCase): reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path) kvps = list(reporter._iterate_kvps(0)) self.assertEqual(0, len(kvps)) + + @mock.patch('cloudinit.distros.uses_systemd') + @mock.patch('cloudinit.util.subp') + def test_get_boot_telemetry(self, m_subp, m_sysd): + reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path) + datetime_pattern = r"\d{4}-[01]\d-[0-3]\dT[0-2]\d:[0-5]" + r"\d:[0-5]\d\.\d+([+-][0-2]\d:[0-5]\d|Z)" + + # get_boot_telemetry makes two subp calls to systemctl. We provide + # a list of values that the subp calls should return + m_subp.side_effect = [ + ('UserspaceTimestampMonotonic=1844838', ''), + ('InactiveExitTimestampMonotonic=3068203', '')] + m_sysd.return_value = True + + reporter.publish_event(azure.get_boot_telemetry()) + reporter.q.join() + kvps = list(reporter._iterate_kvps(0)) + self.assertEqual(1, len(kvps)) + + evt_msg = kvps[0]['value'] + if not re.search("kernel_start=" + datetime_pattern, evt_msg): + raise AssertionError("missing kernel_start timestamp") + if not re.search("user_start=" + datetime_pattern, evt_msg): + raise AssertionError("missing user_start timestamp") + if not re.search("cloudinit_activation=" + datetime_pattern, + evt_msg): + raise AssertionError( + "missing cloudinit_activation timestamp") + + def test_get_system_info(self): + reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path) + pattern = r"[^=\s]+" + + reporter.publish_event(azure.get_system_info()) + reporter.q.join() + kvps = list(reporter._iterate_kvps(0)) + self.assertEqual(1, len(kvps)) + evt_msg = kvps[0]['value'] + + # the most important information is cloudinit version, + # kernel_version, and the distro variant. It is ok if + # if the rest is not available + if not re.search("cloudinit_version=" + pattern, evt_msg): + raise AssertionError("missing cloudinit_version string") + if not re.search("kernel_version=" + pattern, evt_msg): + raise AssertionError("missing kernel_version string") + if not re.search("variant=" + pattern, evt_msg): + raise AssertionError("missing distro variant string") + + def test_report_diagnostic_event(self): + reporter = HyperVKvpReportingHandler(kvp_file_path=self.tmp_file_path) + + reporter.publish_event( + azure.report_diagnostic_event("test_diagnostic")) + reporter.q.join() + kvps = list(reporter._iterate_kvps(0)) + self.assertEqual(1, len(kvps)) + evt_msg = kvps[0]['value'] + + if "test_diagnostic" not in evt_msg: + raise AssertionError("missing expected diagnostic message") -- cgit v1.2.3 From 92cc71dec8da73bf43d345418b73db1a32a6b8b9 Mon Sep 17 00:00:00 2001 From: momousta Date: Fri, 8 Nov 2019 15:02:07 -0600 Subject: reporting: Using a uuid to enforce uniqueness on the KVP keys. The KVPs currently being emitted to the .kvp_pool file can have duplicate keys which is wrong since these keys should be unique. The situation can occur if for example one azure function called twice or more and this function is reporting telemetry through the use of KVPs. Any KVP consumer can get confused by the duplicate keys and a race condition can and have occurred. --- cloudinit/reporting/handlers.py | 8 +++++--- tests/unittests/test_reporting_hyperv.py | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) (limited to 'tests/unittests/test_reporting_hyperv.py') diff --git a/cloudinit/reporting/handlers.py b/cloudinit/reporting/handlers.py index 10165aec..6605e795 100755 --- a/cloudinit/reporting/handlers.py +++ b/cloudinit/reporting/handlers.py @@ -1,6 +1,7 @@ # This file is part of cloud-init. See LICENSE file for license information. import abc +import uuid import fcntl import json import six @@ -201,10 +202,11 @@ class HyperVKvpReportingHandler(ReportingHandler): def _event_key(self, event): """ the event key format is: - CLOUD_INIT||| + CLOUD_INIT||||