diff options
-rw-r--r-- | ChangeLog | 3 | ||||
-rw-r--r-- | cloudinit/sources/helpers/azure.py | 26 | ||||
-rw-r--r-- | tests/unittests/test_datasource/test_azure_helper.py | 76 |
3 files changed, 65 insertions, 40 deletions
@@ -76,6 +76,9 @@ - systemd: support using systemd-detect-virt to detect container (LP: #1539016) [Martin Pitt] - docs: fix lock_passwd documentation [Robert C Jennings] + - Azure: Handle escaped quotes in WALinuxAgentShim.find_endpoint. + (LP: #1488891) [Dan Watkins] + 0.7.6: - open 0.7.6 - Enable vendordata on CloudSigma datasource (LP: #1303986) diff --git a/cloudinit/sources/helpers/azure.py b/cloudinit/sources/helpers/azure.py index d90c22fd..018cac6d 100644 --- a/cloudinit/sources/helpers/azure.py +++ b/cloudinit/sources/helpers/azure.py @@ -197,6 +197,21 @@ class WALinuxAgentShim(object): self.openssl_manager.clean_up() @staticmethod + def get_ip_from_lease_value(lease_value): + unescaped_value = lease_value.replace('\\', '') + if len(unescaped_value) > 4: + hex_string = '' + for hex_pair in unescaped_value.split(':'): + if len(hex_pair) == 1: + hex_pair = '0' + hex_pair + hex_string += hex_pair + packed_bytes = struct.pack( + '>L', int(hex_string.replace(':', ''), 16)) + else: + packed_bytes = unescaped_value.encode('utf-8') + return socket.inet_ntoa(packed_bytes) + + @staticmethod def find_endpoint(): LOG.debug('Finding Azure endpoint...') content = util.load_file('/var/lib/dhcp/dhclient.eth0.leases') @@ -206,16 +221,7 @@ class WALinuxAgentShim(object): value = line.strip(' ').split(' ', 2)[-1].strip(';\n"') if value is None: raise Exception('No endpoint found in DHCP config.') - if ':' in value: - hex_string = '' - for hex_pair in value.split(':'): - if len(hex_pair) == 1: - hex_pair = '0' + hex_pair - hex_string += hex_pair - value = struct.pack('>L', int(hex_string.replace(':', ''), 16)) - else: - value = value.encode('utf-8') - endpoint_ip_address = socket.inet_ntoa(value) + endpoint_ip_address = WALinuxAgentShim.get_ip_from_lease_value(value) LOG.debug('Azure endpoint found at %s', endpoint_ip_address) return endpoint_ip_address diff --git a/tests/unittests/test_datasource/test_azure_helper.py b/tests/unittests/test_datasource/test_azure_helper.py index 0638c974..8dbdfb0b 100644 --- a/tests/unittests/test_datasource/test_azure_helper.py +++ b/tests/unittests/test_datasource/test_azure_helper.py @@ -75,48 +75,64 @@ class TestFindEndpoint(TestCase): self.assertRaises(Exception, azure_helper.WALinuxAgentShim.find_endpoint) - def _build_lease_content(self, ip_address, use_hex=True): - ip_address_repr = ':'.join( - [hex(int(part)).replace('0x', '') - for part in ip_address.split('.')]) - if not use_hex: - ip_address_repr = struct.pack( - '>L', int(ip_address_repr.replace(':', ''), 16)) - ip_address_repr = '"{0}"'.format(ip_address_repr.decode('utf-8')) + @staticmethod + def _build_lease_content(encoded_address): return '\n'.join([ 'lease {', ' interface "eth0";', - ' option unknown-245 {0};'.format(ip_address_repr), + ' option unknown-245 {0};'.format(encoded_address), '}']) - def test_hex_string(self): - ip_address = '98.76.54.32' - file_content = self._build_lease_content(ip_address) + def test_latest_lease_used(self): + encoded_addresses = ['5:4:3:2', '4:3:2:1'] + file_content = '\n'.join([self._build_lease_content(encoded_address) + for encoded_address in encoded_addresses]) self.load_file.return_value = file_content - self.assertEqual(ip_address, + self.assertEqual(encoded_addresses[-1].replace(':', '.'), azure_helper.WALinuxAgentShim.find_endpoint()) + +class TestExtractIpAddressFromLeaseValue(TestCase): + + def test_hex_string(self): + ip_address, encoded_address = '98.76.54.32', '62:4c:36:20' + self.assertEqual( + ip_address, + azure_helper.WALinuxAgentShim.get_ip_from_lease_value( + encoded_address + )) + def test_hex_string_with_single_character_part(self): - ip_address = '4.3.2.1' - file_content = self._build_lease_content(ip_address) - self.load_file.return_value = file_content - self.assertEqual(ip_address, - azure_helper.WALinuxAgentShim.find_endpoint()) + ip_address, encoded_address = '4.3.2.1', '4:3:2:1' + self.assertEqual( + ip_address, + azure_helper.WALinuxAgentShim.get_ip_from_lease_value( + encoded_address + )) def test_packed_string(self): - ip_address = '98.76.54.32' - file_content = self._build_lease_content(ip_address, use_hex=False) - self.load_file.return_value = file_content - self.assertEqual(ip_address, - azure_helper.WALinuxAgentShim.find_endpoint()) + ip_address, encoded_address = '98.76.54.32', 'bL6 ' + self.assertEqual( + ip_address, + azure_helper.WALinuxAgentShim.get_ip_from_lease_value( + encoded_address + )) - def test_latest_lease_used(self): - ip_addresses = ['4.3.2.1', '98.76.54.32'] - file_content = '\n'.join([self._build_lease_content(ip_address) - for ip_address in ip_addresses]) - self.load_file.return_value = file_content - self.assertEqual(ip_addresses[-1], - azure_helper.WALinuxAgentShim.find_endpoint()) + def test_packed_string_with_escaped_quote(self): + ip_address, encoded_address = '100.72.34.108', 'dH\\"l' + self.assertEqual( + ip_address, + azure_helper.WALinuxAgentShim.get_ip_from_lease_value( + encoded_address + )) + + def test_packed_string_containing_a_colon(self): + ip_address, encoded_address = '100.72.58.108', 'dH:l' + self.assertEqual( + ip_address, + azure_helper.WALinuxAgentShim.get_ip_from_lease_value( + encoded_address + )) class TestGoalStateParsing(TestCase): |