diff options
| author | Johnson Shi <Johnson.Shi@microsoft.com> | 2020-08-13 13:50:07 -0700 | 
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-08-13 16:50:07 -0400 | 
| commit | c3556ae82dfb47d635344fcd78908f003648d6d2 (patch) | |
| tree | ef5a8dfdcfb5fd2f5df0722f8c9a32623497b586 /cloudinit/sources/helpers | |
| parent | 1212675ea30feb8726e163c82127ca3cb1951f4e (diff) | |
| download | vyos-cloud-init-c3556ae82dfb47d635344fcd78908f003648d6d2.tar.gz vyos-cloud-init-c3556ae82dfb47d635344fcd78908f003648d6d2.zip | |
Refactor Azure report ready code (#468)
This PR refactors Azure report ready code to include more robust tests and telemetry.
Diffstat (limited to 'cloudinit/sources/helpers')
| -rwxr-xr-x | cloudinit/sources/helpers/azure.py | 401 | 
1 files changed, 297 insertions, 104 deletions
| diff --git a/cloudinit/sources/helpers/azure.py b/cloudinit/sources/helpers/azure.py index 7afa7ed8..6df28ccf 100755 --- a/cloudinit/sources/helpers/azure.py +++ b/cloudinit/sources/helpers/azure.py @@ -195,7 +195,7 @@ def _get_dhcp_endpoint_option_name():      return azure_endpoint -class AzureEndpointHttpClient(object): +class AzureEndpointHttpClient:      headers = {          'x-ms-agent-name': 'WALinuxAgent', @@ -213,57 +213,77 @@ class AzureEndpointHttpClient(object):          if secure:              headers = self.headers.copy()              headers.update(self.extra_secure_headers) -        return url_helper.read_file_or_url(url, headers=headers, timeout=5, -                                           retries=10) +        return url_helper.readurl(url, headers=headers, +                                  timeout=5, retries=10, sec_between=5)      def post(self, url, data=None, extra_headers=None):          headers = self.headers          if extra_headers is not None:              headers = self.headers.copy()              headers.update(extra_headers) -        return url_helper.read_file_or_url(url, data=data, headers=headers, -                                           timeout=5, retries=10) +        return url_helper.readurl(url, data=data, headers=headers, +                                  timeout=5, retries=10, sec_between=5) -class GoalState(object): +class InvalidGoalStateXMLException(Exception): +    """Raised when GoalState XML is invalid or has missing data.""" -    def __init__(self, xml, http_client): -        self.http_client = http_client -        self.root = ElementTree.fromstring(xml) -        self._certificates_xml = None -    def _text_from_xpath(self, xpath): -        element = self.root.find(xpath) -        if element is not None: -            return element.text -        return None +class GoalState: -    @property -    def container_id(self): -        return self._text_from_xpath('./Container/ContainerId') +    def __init__(self, unparsed_xml, azure_endpoint_client): +        """Parses a GoalState XML string and returns a GoalState object. -    @property -    def incarnation(self): -        return self._text_from_xpath('./Incarnation') +        @param unparsed_xml: string representing a GoalState XML. +        @param azure_endpoint_client: instance of AzureEndpointHttpClient +        @return: GoalState object representing the GoalState XML string. +        """ +        self.azure_endpoint_client = azure_endpoint_client -    @property -    def instance_id(self): -        return self._text_from_xpath( +        try: +            self.root = ElementTree.fromstring(unparsed_xml) +        except ElementTree.ParseError as e: +            msg = 'Failed to parse GoalState XML: %s' +            LOG.warning(msg, e) +            report_diagnostic_event(msg % (e,)) +            raise + +        self.container_id = self._text_from_xpath('./Container/ContainerId') +        self.instance_id = self._text_from_xpath(              './Container/RoleInstanceList/RoleInstance/InstanceId') +        self.incarnation = self._text_from_xpath('./Incarnation') + +        for attr in ("container_id", "instance_id", "incarnation"): +            if getattr(self, attr) is None: +                msg = 'Missing %s in GoalState XML' +                LOG.warning(msg, attr) +                report_diagnostic_event(msg % (attr,)) +                raise InvalidGoalStateXMLException(msg) + +        self.certificates_xml = None +        url = self._text_from_xpath( +            './Container/RoleInstanceList/RoleInstance' +            '/Configuration/Certificates') +        if url is not None: +            with events.ReportEventStack( +                    name="get-certificates-xml", +                    description="get certificates xml", +                    parent=azure_ds_reporter): +                self.certificates_xml = \ +                    self.azure_endpoint_client.get( +                        url, secure=True).contents +                if self.certificates_xml is None: +                    raise InvalidGoalStateXMLException( +                        'Azure endpoint returned empty certificates xml.') -    @property -    def certificates_xml(self): -        if self._certificates_xml is None: -            url = self._text_from_xpath( -                './Container/RoleInstanceList/RoleInstance' -                '/Configuration/Certificates') -            if url is not None: -                self._certificates_xml = self.http_client.get( -                    url, secure=True).contents -        return self._certificates_xml +    def _text_from_xpath(self, xpath): +        element = self.root.find(xpath) +        if element is not None: +            return element.text +        return None -class OpenSSLManager(object): +class OpenSSLManager:      certificate_names = {          'private_key': 'TransportPrivate.pem', @@ -370,25 +390,120 @@ class OpenSSLManager(object):          return keys -class WALinuxAgentShim(object): - -    REPORT_READY_XML_TEMPLATE = '\n'.join([ -        '<?xml version="1.0" encoding="utf-8"?>', -        '<Health xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"' -        ' xmlns:xsd="http://www.w3.org/2001/XMLSchema">', -        '  <GoalStateIncarnation>{incarnation}</GoalStateIncarnation>', -        '  <Container>', -        '    <ContainerId>{container_id}</ContainerId>', -        '    <RoleInstanceList>', -        '      <Role>', -        '        <InstanceId>{instance_id}</InstanceId>', -        '        <Health>', -        '          <State>Ready</State>', -        '        </Health>', -        '      </Role>', -        '    </RoleInstanceList>', -        '  </Container>', -        '</Health>']) +class GoalStateHealthReporter: + +    HEALTH_REPORT_XML_TEMPLATE = textwrap.dedent('''\ +        <?xml version="1.0" encoding="utf-8"?> +        <Health xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +         xmlns:xsd="http://www.w3.org/2001/XMLSchema"> +          <GoalStateIncarnation>{incarnation}</GoalStateIncarnation> +          <Container> +            <ContainerId>{container_id}</ContainerId> +            <RoleInstanceList> +              <Role> +                <InstanceId>{instance_id}</InstanceId> +                <Health> +                  <State>{health_status}</State> +                  {health_detail_subsection} +                </Health> +              </Role> +            </RoleInstanceList> +          </Container> +        </Health> +        ''') + +    HEALTH_DETAIL_SUBSECTION_XML_TEMPLATE = textwrap.dedent('''\ +        <Details> +          <SubStatus>{health_substatus}</SubStatus> +          <Description>{health_description}</Description> +        </Details> +        ''') + +    PROVISIONING_SUCCESS_STATUS = 'Ready' + +    def __init__(self, goal_state, azure_endpoint_client, endpoint): +        """Creates instance that will report provisioning status to an endpoint + +        @param goal_state: An instance of class GoalState that contains +            goal state info such as incarnation, container id, and instance id. +            These 3 values are needed when reporting the provisioning status +            to Azure +        @param azure_endpoint_client: Instance of class AzureEndpointHttpClient +        @param endpoint: Endpoint (string) where the provisioning status report +            will be sent to +        @return: Instance of class GoalStateHealthReporter +        """ +        self._goal_state = goal_state +        self._azure_endpoint_client = azure_endpoint_client +        self._endpoint = endpoint + +    @azure_ds_telemetry_reporter +    def send_ready_signal(self): +        document = self.build_report( +            incarnation=self._goal_state.incarnation, +            container_id=self._goal_state.container_id, +            instance_id=self._goal_state.instance_id, +            status=self.PROVISIONING_SUCCESS_STATUS) +        LOG.debug('Reporting ready to Azure fabric.') +        try: +            self._post_health_report(document=document) +        except Exception as e: +            msg = "exception while reporting ready: %s" % e +            LOG.error(msg) +            report_diagnostic_event(msg) +            raise + +        LOG.info('Reported ready to Azure fabric.') + +    def build_report( +            self, incarnation, container_id, instance_id, +            status, substatus=None, description=None): +        health_detail = '' +        if substatus is not None: +            health_detail = self.HEALTH_DETAIL_SUBSECTION_XML_TEMPLATE.format( +                health_substatus=substatus, health_description=description) + +        health_report = self.HEALTH_REPORT_XML_TEMPLATE.format( +            incarnation=incarnation, +            container_id=container_id, +            instance_id=instance_id, +            health_status=status, +            health_detail_subsection=health_detail) + +        return health_report + +    @azure_ds_telemetry_reporter +    def _post_health_report(self, document): +        # Whenever report_diagnostic_event(diagnostic_msg) is invoked in code, +        # the diagnostic messages are written to special files +        # (/var/opt/hyperv/.kvp_pool_*) as Hyper-V KVP messages. +        # Hyper-V KVP message communication is done through these files, +        # and KVP functionality is used to communicate and share diagnostic +        # info with the Azure Host. +        # The Azure Host will collect the VM's Hyper-V KVP diagnostic messages +        # when cloud-init reports to fabric. +        # When the Azure Host receives the health report signal, it will only +        # collect and process whatever KVP diagnostic messages have been +        # written to the KVP files. +        # KVP messages that are published after the Azure Host receives the +        # signal are ignored and unprocessed, so yield this thread to the +        # Hyper-V KVP Reporting thread so that they are written. +        # time.sleep(0) is a low-cost and proven method to yield the scheduler +        # and ensure that events are flushed. +        # See HyperVKvpReportingHandler class, which is a multi-threaded +        # reporting handler that writes to the special KVP files. +        time.sleep(0) + +        LOG.debug('Sending health report to Azure fabric.') +        url = "http://{}/machine?comp=health".format(self._endpoint) +        self._azure_endpoint_client.post( +            url, +            data=document, +            extra_headers={'Content-Type': 'text/xml; charset=utf-8'}) +        LOG.debug('Successfully sent health report to Azure fabric') + + +class WALinuxAgentShim:      def __init__(self, fallback_lease_file=None, dhcp_options=None):          LOG.debug('WALinuxAgentShim instantiated, fallback_lease_file=%s', @@ -396,6 +511,7 @@ class WALinuxAgentShim(object):          self.dhcpoptions = dhcp_options          self._endpoint = None          self.openssl_manager = None +        self.azure_endpoint_client = None          self.lease_file = fallback_lease_file      def clean_up(self): @@ -494,7 +610,22 @@ class WALinuxAgentShim(object):      @staticmethod      @azure_ds_telemetry_reporter      def find_endpoint(fallback_lease_file=None, dhcp245=None): +        """Finds and returns the Azure endpoint using various methods. + +        The Azure endpoint is searched in the following order: +        1. Endpoint from dhcp options (dhcp option 245). +        2. Endpoint from networkd. +        3. Endpoint from dhclient hook json. +        4. Endpoint from fallback lease file. +        5. The default Azure endpoint. + +        @param fallback_lease_file: Fallback lease file that will be used +            during endpoint search. +        @param dhcp245: dhcp options that will be used during endpoint search. +        @return: Azure endpoint IP address. +        """          value = None +          if dhcp245 is not None:              value = dhcp245              LOG.debug("Using Azure Endpoint from dhcp options") @@ -536,42 +667,128 @@ class WALinuxAgentShim(object):      @azure_ds_telemetry_reporter      def register_with_azure_and_fetch_data(self, pubkey_info=None): +        """Gets the VM's GoalState from Azure, uses the GoalState information +        to report ready/send the ready signal/provisioning complete signal to +        Azure, and then uses pubkey_info to filter and obtain the user's +        pubkeys from the GoalState. + +        @param pubkey_info: List of pubkey values and fingerprints which are +            used to filter and obtain the user's pubkey values from the +            GoalState. +        @return: The list of user's authorized pubkey values. +        """          if self.openssl_manager is None:              self.openssl_manager = OpenSSLManager() -        http_client = AzureEndpointHttpClient(self.openssl_manager.certificate) +        if self.azure_endpoint_client is None: +            self.azure_endpoint_client = AzureEndpointHttpClient( +                self.openssl_manager.certificate) +        goal_state = self._fetch_goal_state_from_azure() +        ssh_keys = self._get_user_pubkeys(goal_state, pubkey_info) +        health_reporter = GoalStateHealthReporter( +            goal_state, self.azure_endpoint_client, self.endpoint) +        health_reporter.send_ready_signal() +        return {'public-keys': ssh_keys} + +    @azure_ds_telemetry_reporter +    def _fetch_goal_state_from_azure(self): +        """Fetches the GoalState XML from the Azure endpoint, parses the XML, +        and returns a GoalState object. + +        @return: GoalState object representing the GoalState XML +        """ +        unparsed_goal_state_xml = self._get_raw_goal_state_xml_from_azure() +        return self._parse_raw_goal_state_xml(unparsed_goal_state_xml) + +    @azure_ds_telemetry_reporter +    def _get_raw_goal_state_xml_from_azure(self): +        """Fetches the GoalState XML from the Azure endpoint and returns +        the XML as a string. + +        @return: GoalState XML string +        """ +          LOG.info('Registering with Azure...') -        attempts = 0 -        while True: -            try: -                response = http_client.get( -                    'http://{0}/machine/?comp=goalstate'.format(self.endpoint)) -            except Exception as e: -                if attempts < 10: -                    time.sleep(attempts + 1) -                else: -                    report_diagnostic_event( -                        "failed to register with Azure: %s" % e) -                    raise -            else: -                break -            attempts += 1 +        url = 'http://{}/machine/?comp=goalstate'.format(self.endpoint) +        try: +            response = self.azure_endpoint_client.get(url) +        except Exception as e: +            msg = 'failed to register with Azure: %s' % e +            LOG.warning(msg) +            report_diagnostic_event(msg) +            raise          LOG.debug('Successfully fetched GoalState XML.') -        goal_state = GoalState(response.contents, http_client) -        report_diagnostic_event("container_id %s" % goal_state.container_id) +        return response.contents + +    @azure_ds_telemetry_reporter +    def _parse_raw_goal_state_xml(self, unparsed_goal_state_xml): +        """Parses a GoalState XML string and returns a GoalState object. + +        @param unparsed_goal_state_xml: GoalState XML string +        @return: GoalState object representing the GoalState XML +        """ +        try: +            goal_state = GoalState( +                unparsed_goal_state_xml, self.azure_endpoint_client) +        except Exception as e: +            msg = 'Error processing GoalState XML: %s' % e +            LOG.warning(msg) +            report_diagnostic_event(msg) +            raise +        msg = ', '.join([ +            'GoalState XML container id: %s' % goal_state.container_id, +            'GoalState XML instance id: %s' % goal_state.instance_id, +            'GoalState XML incarnation: %s' % goal_state.incarnation]) +        LOG.debug(msg) +        report_diagnostic_event(msg) +        return goal_state + +    @azure_ds_telemetry_reporter +    def _get_user_pubkeys(self, goal_state, pubkey_info): +        """Gets and filters the VM admin user's authorized pubkeys. + +        The admin user in this case is the username specified as "admin" +        when deploying VMs on Azure. +        See https://docs.microsoft.com/en-us/cli/azure/vm#az-vm-create. +        cloud-init expects a straightforward array of keys to be dropped +        into the admin user's authorized_keys file. Azure control plane exposes +        multiple public keys to the VM via wireserver. Select just the +        admin user's key(s) and return them, ignoring any other certs. + +        @param goal_state: GoalState object. The GoalState object contains +            a certificate XML, which contains both the VM user's authorized +            pubkeys and other non-user pubkeys, which are used for +            MSI and protected extension handling. +        @param pubkey_info: List of VM user pubkey dicts that were previously +            obtained from provisioning data. +            Each pubkey dict in this list can either have the format +            pubkey['value'] or pubkey['fingerprint']. +            Each pubkey['fingerprint'] in the list is used to filter +            and obtain the actual pubkey value from the GoalState +            certificates XML. +            Each pubkey['value'] requires no further processing and is +            immediately added to the return list. +        @return: A list of the VM user's authorized pubkey values. +        """          ssh_keys = []          if goal_state.certificates_xml is not None and pubkey_info is not None:              LOG.debug('Certificate XML found; parsing out public keys.')              keys_by_fingerprint = self.openssl_manager.parse_certificates(                  goal_state.certificates_xml)              ssh_keys = self._filter_pubkeys(keys_by_fingerprint, pubkey_info) -        self._report_ready(goal_state, http_client) -        return {'public-keys': ssh_keys} +        return ssh_keys -    def _filter_pubkeys(self, keys_by_fingerprint, pubkey_info): -        """cloud-init expects a straightforward array of keys to be dropped -           into the user's authorized_keys file. Azure control plane exposes -           multiple public keys to the VM via wireserver. Select just the -           user's key(s) and return them, ignoring any other certs. +    @staticmethod +    def _filter_pubkeys(keys_by_fingerprint, pubkey_info): +        """ Filter and return only the user's actual pubkeys. + +        @param keys_by_fingerprint: pubkey fingerprint -> pubkey value dict +            that was obtained from GoalState Certificates XML. May contain +            non-user pubkeys. +        @param pubkey_info: List of VM user pubkeys. Pubkey values are added +            to the return list without further processing. Pubkey fingerprints +            are used to filter and obtain the actual pubkey values from +            keys_by_fingerprint. +        @return: A list of the VM user's authorized pubkey values.          """          keys = []          for pubkey in pubkey_info: @@ -590,30 +807,6 @@ class WALinuxAgentShim(object):          return keys -    @azure_ds_telemetry_reporter -    def _report_ready(self, goal_state, http_client): -        LOG.debug('Reporting ready to Azure fabric.') -        document = self.REPORT_READY_XML_TEMPLATE.format( -            incarnation=goal_state.incarnation, -            container_id=goal_state.container_id, -            instance_id=goal_state.instance_id, -        ) -        # Host will collect kvps when cloud-init reports ready. -        # some kvps might still be in the queue. We yield the scheduler -        # to make sure we process all kvps up till this point. -        time.sleep(0) -        try: -            http_client.post( -                "http://{0}/machine?comp=health".format(self.endpoint), -                data=document, -                extra_headers={'Content-Type': 'text/xml; charset=utf-8'}, -            ) -        except Exception as e: -            report_diagnostic_event("exception while reporting ready: %s" % e) -            raise - -        LOG.info('Reported ready to Azure fabric.') -  @azure_ds_telemetry_reporter  def get_metadata_from_fabric(fallback_lease_file=None, dhcp_opts=None, @@ -631,7 +824,7 @@ def dhcp_log_cb(out, err):      report_diagnostic_event("dhclient error stream: %s" % err) -class EphemeralDHCPv4WithReporting(object): +class EphemeralDHCPv4WithReporting:      def __init__(self, reporter, nic=None):          self.reporter = reporter          self.ephemeralDHCPv4 = EphemeralDHCPv4( | 
