diff options
author | Johnson Shi <Johnson.Shi@microsoft.com> | 2020-08-13 13:50:07 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-13 16:50:07 -0400 |
commit | c3556ae82dfb47d635344fcd78908f003648d6d2 (patch) | |
tree | ef5a8dfdcfb5fd2f5df0722f8c9a32623497b586 /cloudinit/sources | |
parent | 1212675ea30feb8726e163c82127ca3cb1951f4e (diff) | |
download | vyos-cloud-init-c3556ae82dfb47d635344fcd78908f003648d6d2.tar.gz vyos-cloud-init-c3556ae82dfb47d635344fcd78908f003648d6d2.zip |
Refactor Azure report ready code (#468)
This PR refactors Azure report ready code to include more robust tests and telemetry.
Diffstat (limited to 'cloudinit/sources')
-rwxr-xr-x | cloudinit/sources/helpers/azure.py | 401 |
1 files changed, 297 insertions, 104 deletions
diff --git a/cloudinit/sources/helpers/azure.py b/cloudinit/sources/helpers/azure.py index 7afa7ed8..6df28ccf 100755 --- a/cloudinit/sources/helpers/azure.py +++ b/cloudinit/sources/helpers/azure.py @@ -195,7 +195,7 @@ def _get_dhcp_endpoint_option_name(): return azure_endpoint -class AzureEndpointHttpClient(object): +class AzureEndpointHttpClient: headers = { 'x-ms-agent-name': 'WALinuxAgent', @@ -213,57 +213,77 @@ class AzureEndpointHttpClient(object): if secure: headers = self.headers.copy() headers.update(self.extra_secure_headers) - return url_helper.read_file_or_url(url, headers=headers, timeout=5, - retries=10) + return url_helper.readurl(url, headers=headers, + timeout=5, retries=10, sec_between=5) def post(self, url, data=None, extra_headers=None): headers = self.headers if extra_headers is not None: headers = self.headers.copy() headers.update(extra_headers) - return url_helper.read_file_or_url(url, data=data, headers=headers, - timeout=5, retries=10) + return url_helper.readurl(url, data=data, headers=headers, + timeout=5, retries=10, sec_between=5) -class GoalState(object): +class InvalidGoalStateXMLException(Exception): + """Raised when GoalState XML is invalid or has missing data.""" - def __init__(self, xml, http_client): - self.http_client = http_client - self.root = ElementTree.fromstring(xml) - self._certificates_xml = None - def _text_from_xpath(self, xpath): - element = self.root.find(xpath) - if element is not None: - return element.text - return None +class GoalState: - @property - def container_id(self): - return self._text_from_xpath('./Container/ContainerId') + def __init__(self, unparsed_xml, azure_endpoint_client): + """Parses a GoalState XML string and returns a GoalState object. - @property - def incarnation(self): - return self._text_from_xpath('./Incarnation') + @param unparsed_xml: string representing a GoalState XML. + @param azure_endpoint_client: instance of AzureEndpointHttpClient + @return: GoalState object representing the GoalState XML string. + """ + self.azure_endpoint_client = azure_endpoint_client - @property - def instance_id(self): - return self._text_from_xpath( + try: + self.root = ElementTree.fromstring(unparsed_xml) + except ElementTree.ParseError as e: + msg = 'Failed to parse GoalState XML: %s' + LOG.warning(msg, e) + report_diagnostic_event(msg % (e,)) + raise + + self.container_id = self._text_from_xpath('./Container/ContainerId') + self.instance_id = self._text_from_xpath( './Container/RoleInstanceList/RoleInstance/InstanceId') + self.incarnation = self._text_from_xpath('./Incarnation') + + for attr in ("container_id", "instance_id", "incarnation"): + if getattr(self, attr) is None: + msg = 'Missing %s in GoalState XML' + LOG.warning(msg, attr) + report_diagnostic_event(msg % (attr,)) + raise InvalidGoalStateXMLException(msg) + + self.certificates_xml = None + url = self._text_from_xpath( + './Container/RoleInstanceList/RoleInstance' + '/Configuration/Certificates') + if url is not None: + with events.ReportEventStack( + name="get-certificates-xml", + description="get certificates xml", + parent=azure_ds_reporter): + self.certificates_xml = \ + self.azure_endpoint_client.get( + url, secure=True).contents + if self.certificates_xml is None: + raise InvalidGoalStateXMLException( + 'Azure endpoint returned empty certificates xml.') - @property - def certificates_xml(self): - if self._certificates_xml is None: - url = self._text_from_xpath( - './Container/RoleInstanceList/RoleInstance' - '/Configuration/Certificates') - if url is not None: - self._certificates_xml = self.http_client.get( - url, secure=True).contents - return self._certificates_xml + def _text_from_xpath(self, xpath): + element = self.root.find(xpath) + if element is not None: + return element.text + return None -class OpenSSLManager(object): +class OpenSSLManager: certificate_names = { 'private_key': 'TransportPrivate.pem', @@ -370,25 +390,120 @@ class OpenSSLManager(object): return keys -class WALinuxAgentShim(object): - - REPORT_READY_XML_TEMPLATE = '\n'.join([ - '<?xml version="1.0" encoding="utf-8"?>', - '<Health xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"' - ' xmlns:xsd="http://www.w3.org/2001/XMLSchema">', - ' <GoalStateIncarnation>{incarnation}</GoalStateIncarnation>', - ' <Container>', - ' <ContainerId>{container_id}</ContainerId>', - ' <RoleInstanceList>', - ' <Role>', - ' <InstanceId>{instance_id}</InstanceId>', - ' <Health>', - ' <State>Ready</State>', - ' </Health>', - ' </Role>', - ' </RoleInstanceList>', - ' </Container>', - '</Health>']) +class GoalStateHealthReporter: + + HEALTH_REPORT_XML_TEMPLATE = textwrap.dedent('''\ + <?xml version="1.0" encoding="utf-8"?> + <Health xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:xsd="http://www.w3.org/2001/XMLSchema"> + <GoalStateIncarnation>{incarnation}</GoalStateIncarnation> + <Container> + <ContainerId>{container_id}</ContainerId> + <RoleInstanceList> + <Role> + <InstanceId>{instance_id}</InstanceId> + <Health> + <State>{health_status}</State> + {health_detail_subsection} + </Health> + </Role> + </RoleInstanceList> + </Container> + </Health> + ''') + + HEALTH_DETAIL_SUBSECTION_XML_TEMPLATE = textwrap.dedent('''\ + <Details> + <SubStatus>{health_substatus}</SubStatus> + <Description>{health_description}</Description> + </Details> + ''') + + PROVISIONING_SUCCESS_STATUS = 'Ready' + + def __init__(self, goal_state, azure_endpoint_client, endpoint): + """Creates instance that will report provisioning status to an endpoint + + @param goal_state: An instance of class GoalState that contains + goal state info such as incarnation, container id, and instance id. + These 3 values are needed when reporting the provisioning status + to Azure + @param azure_endpoint_client: Instance of class AzureEndpointHttpClient + @param endpoint: Endpoint (string) where the provisioning status report + will be sent to + @return: Instance of class GoalStateHealthReporter + """ + self._goal_state = goal_state + self._azure_endpoint_client = azure_endpoint_client + self._endpoint = endpoint + + @azure_ds_telemetry_reporter + def send_ready_signal(self): + document = self.build_report( + incarnation=self._goal_state.incarnation, + container_id=self._goal_state.container_id, + instance_id=self._goal_state.instance_id, + status=self.PROVISIONING_SUCCESS_STATUS) + LOG.debug('Reporting ready to Azure fabric.') + try: + self._post_health_report(document=document) + except Exception as e: + msg = "exception while reporting ready: %s" % e + LOG.error(msg) + report_diagnostic_event(msg) + raise + + LOG.info('Reported ready to Azure fabric.') + + def build_report( + self, incarnation, container_id, instance_id, + status, substatus=None, description=None): + health_detail = '' + if substatus is not None: + health_detail = self.HEALTH_DETAIL_SUBSECTION_XML_TEMPLATE.format( + health_substatus=substatus, health_description=description) + + health_report = self.HEALTH_REPORT_XML_TEMPLATE.format( + incarnation=incarnation, + container_id=container_id, + instance_id=instance_id, + health_status=status, + health_detail_subsection=health_detail) + + return health_report + + @azure_ds_telemetry_reporter + def _post_health_report(self, document): + # Whenever report_diagnostic_event(diagnostic_msg) is invoked in code, + # the diagnostic messages are written to special files + # (/var/opt/hyperv/.kvp_pool_*) as Hyper-V KVP messages. + # Hyper-V KVP message communication is done through these files, + # and KVP functionality is used to communicate and share diagnostic + # info with the Azure Host. + # The Azure Host will collect the VM's Hyper-V KVP diagnostic messages + # when cloud-init reports to fabric. + # When the Azure Host receives the health report signal, it will only + # collect and process whatever KVP diagnostic messages have been + # written to the KVP files. + # KVP messages that are published after the Azure Host receives the + # signal are ignored and unprocessed, so yield this thread to the + # Hyper-V KVP Reporting thread so that they are written. + # time.sleep(0) is a low-cost and proven method to yield the scheduler + # and ensure that events are flushed. + # See HyperVKvpReportingHandler class, which is a multi-threaded + # reporting handler that writes to the special KVP files. + time.sleep(0) + + LOG.debug('Sending health report to Azure fabric.') + url = "http://{}/machine?comp=health".format(self._endpoint) + self._azure_endpoint_client.post( + url, + data=document, + extra_headers={'Content-Type': 'text/xml; charset=utf-8'}) + LOG.debug('Successfully sent health report to Azure fabric') + + +class WALinuxAgentShim: def __init__(self, fallback_lease_file=None, dhcp_options=None): LOG.debug('WALinuxAgentShim instantiated, fallback_lease_file=%s', @@ -396,6 +511,7 @@ class WALinuxAgentShim(object): self.dhcpoptions = dhcp_options self._endpoint = None self.openssl_manager = None + self.azure_endpoint_client = None self.lease_file = fallback_lease_file def clean_up(self): @@ -494,7 +610,22 @@ class WALinuxAgentShim(object): @staticmethod @azure_ds_telemetry_reporter def find_endpoint(fallback_lease_file=None, dhcp245=None): + """Finds and returns the Azure endpoint using various methods. + + The Azure endpoint is searched in the following order: + 1. Endpoint from dhcp options (dhcp option 245). + 2. Endpoint from networkd. + 3. Endpoint from dhclient hook json. + 4. Endpoint from fallback lease file. + 5. The default Azure endpoint. + + @param fallback_lease_file: Fallback lease file that will be used + during endpoint search. + @param dhcp245: dhcp options that will be used during endpoint search. + @return: Azure endpoint IP address. + """ value = None + if dhcp245 is not None: value = dhcp245 LOG.debug("Using Azure Endpoint from dhcp options") @@ -536,42 +667,128 @@ class WALinuxAgentShim(object): @azure_ds_telemetry_reporter def register_with_azure_and_fetch_data(self, pubkey_info=None): + """Gets the VM's GoalState from Azure, uses the GoalState information + to report ready/send the ready signal/provisioning complete signal to + Azure, and then uses pubkey_info to filter and obtain the user's + pubkeys from the GoalState. + + @param pubkey_info: List of pubkey values and fingerprints which are + used to filter and obtain the user's pubkey values from the + GoalState. + @return: The list of user's authorized pubkey values. + """ if self.openssl_manager is None: self.openssl_manager = OpenSSLManager() - http_client = AzureEndpointHttpClient(self.openssl_manager.certificate) + if self.azure_endpoint_client is None: + self.azure_endpoint_client = AzureEndpointHttpClient( + self.openssl_manager.certificate) + goal_state = self._fetch_goal_state_from_azure() + ssh_keys = self._get_user_pubkeys(goal_state, pubkey_info) + health_reporter = GoalStateHealthReporter( + goal_state, self.azure_endpoint_client, self.endpoint) + health_reporter.send_ready_signal() + return {'public-keys': ssh_keys} + + @azure_ds_telemetry_reporter + def _fetch_goal_state_from_azure(self): + """Fetches the GoalState XML from the Azure endpoint, parses the XML, + and returns a GoalState object. + + @return: GoalState object representing the GoalState XML + """ + unparsed_goal_state_xml = self._get_raw_goal_state_xml_from_azure() + return self._parse_raw_goal_state_xml(unparsed_goal_state_xml) + + @azure_ds_telemetry_reporter + def _get_raw_goal_state_xml_from_azure(self): + """Fetches the GoalState XML from the Azure endpoint and returns + the XML as a string. + + @return: GoalState XML string + """ + LOG.info('Registering with Azure...') - attempts = 0 - while True: - try: - response = http_client.get( - 'http://{0}/machine/?comp=goalstate'.format(self.endpoint)) - except Exception as e: - if attempts < 10: - time.sleep(attempts + 1) - else: - report_diagnostic_event( - "failed to register with Azure: %s" % e) - raise - else: - break - attempts += 1 + url = 'http://{}/machine/?comp=goalstate'.format(self.endpoint) + try: + response = self.azure_endpoint_client.get(url) + except Exception as e: + msg = 'failed to register with Azure: %s' % e + LOG.warning(msg) + report_diagnostic_event(msg) + raise LOG.debug('Successfully fetched GoalState XML.') - goal_state = GoalState(response.contents, http_client) - report_diagnostic_event("container_id %s" % goal_state.container_id) + return response.contents + + @azure_ds_telemetry_reporter + def _parse_raw_goal_state_xml(self, unparsed_goal_state_xml): + """Parses a GoalState XML string and returns a GoalState object. + + @param unparsed_goal_state_xml: GoalState XML string + @return: GoalState object representing the GoalState XML + """ + try: + goal_state = GoalState( + unparsed_goal_state_xml, self.azure_endpoint_client) + except Exception as e: + msg = 'Error processing GoalState XML: %s' % e + LOG.warning(msg) + report_diagnostic_event(msg) + raise + msg = ', '.join([ + 'GoalState XML container id: %s' % goal_state.container_id, + 'GoalState XML instance id: %s' % goal_state.instance_id, + 'GoalState XML incarnation: %s' % goal_state.incarnation]) + LOG.debug(msg) + report_diagnostic_event(msg) + return goal_state + + @azure_ds_telemetry_reporter + def _get_user_pubkeys(self, goal_state, pubkey_info): + """Gets and filters the VM admin user's authorized pubkeys. + + The admin user in this case is the username specified as "admin" + when deploying VMs on Azure. + See https://docs.microsoft.com/en-us/cli/azure/vm#az-vm-create. + cloud-init expects a straightforward array of keys to be dropped + into the admin user's authorized_keys file. Azure control plane exposes + multiple public keys to the VM via wireserver. Select just the + admin user's key(s) and return them, ignoring any other certs. + + @param goal_state: GoalState object. The GoalState object contains + a certificate XML, which contains both the VM user's authorized + pubkeys and other non-user pubkeys, which are used for + MSI and protected extension handling. + @param pubkey_info: List of VM user pubkey dicts that were previously + obtained from provisioning data. + Each pubkey dict in this list can either have the format + pubkey['value'] or pubkey['fingerprint']. + Each pubkey['fingerprint'] in the list is used to filter + and obtain the actual pubkey value from the GoalState + certificates XML. + Each pubkey['value'] requires no further processing and is + immediately added to the return list. + @return: A list of the VM user's authorized pubkey values. + """ ssh_keys = [] if goal_state.certificates_xml is not None and pubkey_info is not None: LOG.debug('Certificate XML found; parsing out public keys.') keys_by_fingerprint = self.openssl_manager.parse_certificates( goal_state.certificates_xml) ssh_keys = self._filter_pubkeys(keys_by_fingerprint, pubkey_info) - self._report_ready(goal_state, http_client) - return {'public-keys': ssh_keys} + return ssh_keys - def _filter_pubkeys(self, keys_by_fingerprint, pubkey_info): - """cloud-init expects a straightforward array of keys to be dropped - into the user's authorized_keys file. Azure control plane exposes - multiple public keys to the VM via wireserver. Select just the - user's key(s) and return them, ignoring any other certs. + @staticmethod + def _filter_pubkeys(keys_by_fingerprint, pubkey_info): + """ Filter and return only the user's actual pubkeys. + + @param keys_by_fingerprint: pubkey fingerprint -> pubkey value dict + that was obtained from GoalState Certificates XML. May contain + non-user pubkeys. + @param pubkey_info: List of VM user pubkeys. Pubkey values are added + to the return list without further processing. Pubkey fingerprints + are used to filter and obtain the actual pubkey values from + keys_by_fingerprint. + @return: A list of the VM user's authorized pubkey values. """ keys = [] for pubkey in pubkey_info: @@ -590,30 +807,6 @@ class WALinuxAgentShim(object): return keys - @azure_ds_telemetry_reporter - def _report_ready(self, goal_state, http_client): - LOG.debug('Reporting ready to Azure fabric.') - document = self.REPORT_READY_XML_TEMPLATE.format( - incarnation=goal_state.incarnation, - container_id=goal_state.container_id, - instance_id=goal_state.instance_id, - ) - # Host will collect kvps when cloud-init reports ready. - # some kvps might still be in the queue. We yield the scheduler - # to make sure we process all kvps up till this point. - time.sleep(0) - try: - http_client.post( - "http://{0}/machine?comp=health".format(self.endpoint), - data=document, - extra_headers={'Content-Type': 'text/xml; charset=utf-8'}, - ) - except Exception as e: - report_diagnostic_event("exception while reporting ready: %s" % e) - raise - - LOG.info('Reported ready to Azure fabric.') - @azure_ds_telemetry_reporter def get_metadata_from_fabric(fallback_lease_file=None, dhcp_opts=None, @@ -631,7 +824,7 @@ def dhcp_log_cb(out, err): report_diagnostic_event("dhclient error stream: %s" % err) -class EphemeralDHCPv4WithReporting(object): +class EphemeralDHCPv4WithReporting: def __init__(self, reporter, nic=None): self.reporter = reporter self.ephemeralDHCPv4 = EphemeralDHCPv4( |