diff options
| -rw-r--r-- | ChangeLog | 3 | ||||
| -rw-r--r-- | cloudinit/sources/helpers/azure.py | 26 | ||||
| -rw-r--r-- | tests/unittests/test_datasource/test_azure_helper.py | 76 | 
3 files changed, 65 insertions, 40 deletions
| @@ -76,6 +76,9 @@   - systemd: support using systemd-detect-virt to detect container     (LP: #1539016) [Martin Pitt]   - docs: fix lock_passwd documentation [Robert C Jennings] + - Azure: Handle escaped quotes in WALinuxAgentShim.find_endpoint. +   (LP: #1488891) [Dan Watkins] +  0.7.6:   - open 0.7.6   - Enable vendordata on CloudSigma datasource (LP: #1303986) diff --git a/cloudinit/sources/helpers/azure.py b/cloudinit/sources/helpers/azure.py index d90c22fd..018cac6d 100644 --- a/cloudinit/sources/helpers/azure.py +++ b/cloudinit/sources/helpers/azure.py @@ -197,6 +197,21 @@ class WALinuxAgentShim(object):              self.openssl_manager.clean_up()      @staticmethod +    def get_ip_from_lease_value(lease_value): +        unescaped_value = lease_value.replace('\\', '') +        if len(unescaped_value) > 4: +            hex_string = '' +            for hex_pair in unescaped_value.split(':'): +                if len(hex_pair) == 1: +                    hex_pair = '0' + hex_pair +                hex_string += hex_pair +            packed_bytes = struct.pack( +                '>L', int(hex_string.replace(':', ''), 16)) +        else: +            packed_bytes = unescaped_value.encode('utf-8') +        return socket.inet_ntoa(packed_bytes) + +    @staticmethod      def find_endpoint():          LOG.debug('Finding Azure endpoint...')          content = util.load_file('/var/lib/dhcp/dhclient.eth0.leases') @@ -206,16 +221,7 @@ class WALinuxAgentShim(object):                  value = line.strip(' ').split(' ', 2)[-1].strip(';\n"')          if value is None:              raise Exception('No endpoint found in DHCP config.') -        if ':' in value: -            hex_string = '' -            for hex_pair in value.split(':'): -                if len(hex_pair) == 1: -                    hex_pair = '0' + hex_pair -                hex_string += hex_pair -            value = struct.pack('>L', int(hex_string.replace(':', ''), 16)) -        else: -            value = value.encode('utf-8') -        endpoint_ip_address = socket.inet_ntoa(value) +        endpoint_ip_address = WALinuxAgentShim.get_ip_from_lease_value(value)          LOG.debug('Azure endpoint found at %s', endpoint_ip_address)          return endpoint_ip_address diff --git a/tests/unittests/test_datasource/test_azure_helper.py b/tests/unittests/test_datasource/test_azure_helper.py index 0638c974..8dbdfb0b 100644 --- a/tests/unittests/test_datasource/test_azure_helper.py +++ b/tests/unittests/test_datasource/test_azure_helper.py @@ -75,48 +75,64 @@ class TestFindEndpoint(TestCase):          self.assertRaises(Exception,                            azure_helper.WALinuxAgentShim.find_endpoint) -    def _build_lease_content(self, ip_address, use_hex=True): -        ip_address_repr = ':'.join( -            [hex(int(part)).replace('0x', '') -             for part in ip_address.split('.')]) -        if not use_hex: -            ip_address_repr = struct.pack( -                '>L', int(ip_address_repr.replace(':', ''), 16)) -            ip_address_repr = '"{0}"'.format(ip_address_repr.decode('utf-8')) +    @staticmethod +    def _build_lease_content(encoded_address):          return '\n'.join([              'lease {',              ' interface "eth0";', -            ' option unknown-245 {0};'.format(ip_address_repr), +            ' option unknown-245 {0};'.format(encoded_address),              '}']) -    def test_hex_string(self): -        ip_address = '98.76.54.32' -        file_content = self._build_lease_content(ip_address) +    def test_latest_lease_used(self): +        encoded_addresses = ['5:4:3:2', '4:3:2:1'] +        file_content = '\n'.join([self._build_lease_content(encoded_address) +                                  for encoded_address in encoded_addresses])          self.load_file.return_value = file_content -        self.assertEqual(ip_address, +        self.assertEqual(encoded_addresses[-1].replace(':', '.'),                           azure_helper.WALinuxAgentShim.find_endpoint()) + +class TestExtractIpAddressFromLeaseValue(TestCase): + +    def test_hex_string(self): +        ip_address, encoded_address = '98.76.54.32', '62:4c:36:20' +        self.assertEqual( +            ip_address, +            azure_helper.WALinuxAgentShim.get_ip_from_lease_value( +                encoded_address +            )) +      def test_hex_string_with_single_character_part(self): -        ip_address = '4.3.2.1' -        file_content = self._build_lease_content(ip_address) -        self.load_file.return_value = file_content -        self.assertEqual(ip_address, -                         azure_helper.WALinuxAgentShim.find_endpoint()) +        ip_address, encoded_address = '4.3.2.1', '4:3:2:1' +        self.assertEqual( +            ip_address, +            azure_helper.WALinuxAgentShim.get_ip_from_lease_value( +                encoded_address +            ))      def test_packed_string(self): -        ip_address = '98.76.54.32' -        file_content = self._build_lease_content(ip_address, use_hex=False) -        self.load_file.return_value = file_content -        self.assertEqual(ip_address, -                         azure_helper.WALinuxAgentShim.find_endpoint()) +        ip_address, encoded_address = '98.76.54.32', 'bL6 ' +        self.assertEqual( +            ip_address, +            azure_helper.WALinuxAgentShim.get_ip_from_lease_value( +                encoded_address +            )) -    def test_latest_lease_used(self): -        ip_addresses = ['4.3.2.1', '98.76.54.32'] -        file_content = '\n'.join([self._build_lease_content(ip_address) -                                  for ip_address in ip_addresses]) -        self.load_file.return_value = file_content -        self.assertEqual(ip_addresses[-1], -                         azure_helper.WALinuxAgentShim.find_endpoint()) +    def test_packed_string_with_escaped_quote(self): +        ip_address, encoded_address = '100.72.34.108', 'dH\\"l' +        self.assertEqual( +            ip_address, +            azure_helper.WALinuxAgentShim.get_ip_from_lease_value( +                encoded_address +            )) + +    def test_packed_string_containing_a_colon(self): +        ip_address, encoded_address = '100.72.58.108', 'dH:l' +        self.assertEqual( +            ip_address, +            azure_helper.WALinuxAgentShim.get_ip_from_lease_value( +                encoded_address +            ))  class TestGoalStateParsing(TestCase): | 
