diff options
Diffstat (limited to 'tests/unittests/sources/test_azure.py')
-rw-r--r-- | tests/unittests/sources/test_azure.py | 3174 |
1 files changed, 1833 insertions, 1341 deletions
diff --git a/tests/unittests/sources/test_azure.py b/tests/unittests/sources/test_azure.py index ad8be04b..8b0762b7 100644 --- a/tests/unittests/sources/test_azure.py +++ b/tests/unittests/sources/test_azure.py @@ -1,33 +1,47 @@ # This file is part of cloud-init. See LICENSE file for license information. -from cloudinit import distros -from cloudinit import helpers -from cloudinit import url_helper -from cloudinit.sources import ( - UNSET, DataSourceAzure as dsaz, InvalidMetaDataException) -from cloudinit.util import (b64e, decode_binary, load_file, write_file, - MountFailedError, json_dumps, load_json) -from cloudinit.version import version_string as vs -from tests.unittests.helpers import ( - HttprettyTestCase, CiTestCase, populate_dir, mock, wrap_and_call, - ExitStack, resourceLocation) -from cloudinit.sources.helpers import netlink - import copy import crypt -import httpretty import json import os -import requests import stat import xml.etree.ElementTree as ET -import yaml +import httpretty +import requests +import yaml -def construct_valid_ovf_env(data=None, pubkeys=None, - userdata=None, platform_settings=None): +from cloudinit import distros, helpers, url_helper +from cloudinit.sources import UNSET +from cloudinit.sources import DataSourceAzure as dsaz +from cloudinit.sources import InvalidMetaDataException +from cloudinit.sources.helpers import netlink +from cloudinit.util import ( + MountFailedError, + b64e, + decode_binary, + json_dumps, + load_file, + load_json, + write_file, +) +from cloudinit.version import version_string as vs +from tests.unittests.helpers import ( + CiTestCase, + ExitStack, + HttprettyTestCase, + mock, + populate_dir, + resourceLocation, + wrap_and_call, +) + + +def construct_valid_ovf_env( + data=None, pubkeys=None, userdata=None, platform_settings=None +): if data is None: - data = {'HostName': 'FOOHOST'} + data = {"HostName": "FOOHOST"} if pubkeys is None: pubkeys = {} @@ -45,9 +59,14 @@ def construct_valid_ovf_env(data=None, pubkeys=None, """ for key, dval in data.items(): if isinstance(dval, dict): - val = dict(dval).get('text') - attrs = ' ' + ' '.join(["%s='%s'" % (k, v) for k, v - in dict(dval).items() if k != 'text']) + val = dict(dval).get("text") + attrs = " " + " ".join( + [ + "%s='%s'" % (k, v) + for k, v in dict(dval).items() + if k != "text" + ] + ) else: val = dval attrs = "" @@ -61,8 +80,10 @@ def construct_valid_ovf_env(data=None, pubkeys=None, for fp, path, value in pubkeys: content += " <PublicKey>" if fp and path: - content += ("<Fingerprint>%s</Fingerprint><Path>%s</Path>" % - (fp, path)) + content += "<Fingerprint>%s</Fingerprint><Path>%s</Path>" % ( + fp, + path, + ) if value: content += "<Value>%s</Value>" % value content += "</PublicKey>\n" @@ -106,300 +127,331 @@ NETWORK_METADATA = { "vmScaleSetName": "", "vmSize": "Standard_DS1_v2", "zone": "", - "publicKeys": [ - { - "keyData": "ssh-rsa key1", - "path": "path1" - } - ] + "publicKeys": [{"keyData": "ssh-rsa key1", "path": "path1"}], }, "network": { "interface": [ { "macAddress": "000D3A047598", - "ipv6": { - "ipAddress": [] - }, + "ipv6": {"ipAddress": []}, "ipv4": { - "subnet": [ - { - "prefix": "24", - "address": "10.0.0.0" - } - ], + "subnet": [{"prefix": "24", "address": "10.0.0.0"}], "ipAddress": [ { "privateIpAddress": "10.0.0.4", - "publicIpAddress": "104.46.124.81" + "publicIpAddress": "104.46.124.81", } - ] - } + ], + }, } ] - } + }, } SECONDARY_INTERFACE = { "macAddress": "220D3A047598", - "ipv6": { - "ipAddress": [] - }, + "ipv6": {"ipAddress": []}, "ipv4": { - "subnet": [ - { - "prefix": "24", - "address": "10.0.1.0" - } - ], + "subnet": [{"prefix": "24", "address": "10.0.1.0"}], "ipAddress": [ { "privateIpAddress": "10.0.1.5", } - ] - } + ], + }, } SECONDARY_INTERFACE_NO_IP = { "macAddress": "220D3A047598", - "ipv6": { - "ipAddress": [] - }, + "ipv6": {"ipAddress": []}, "ipv4": { - "subnet": [ - { - "prefix": "24", - "address": "10.0.1.0" - } - ], - "ipAddress": [] - } + "subnet": [{"prefix": "24", "address": "10.0.1.0"}], + "ipAddress": [], + }, } IMDS_NETWORK_METADATA = { "interface": [ { "macAddress": "000D3A047598", - "ipv6": { - "ipAddress": [] - }, + "ipv6": {"ipAddress": []}, "ipv4": { - "subnet": [ - { - "prefix": "24", - "address": "10.0.0.0" - } - ], + "subnet": [{"prefix": "24", "address": "10.0.0.0"}], "ipAddress": [ { "privateIpAddress": "10.0.0.4", - "publicIpAddress": "104.46.124.81" + "publicIpAddress": "104.46.124.81", } - ] - } + ], + }, } ] } -MOCKPATH = 'cloudinit.sources.DataSourceAzure.' -EXAMPLE_UUID = 'd0df4c54-4ecb-4a4b-9954-5bdf3ed5c3b8' +MOCKPATH = "cloudinit.sources.DataSourceAzure." +EXAMPLE_UUID = "d0df4c54-4ecb-4a4b-9954-5bdf3ed5c3b8" class TestParseNetworkConfig(CiTestCase): maxDiff = None fallback_config = { - 'version': 1, - 'config': [{ - 'type': 'physical', 'name': 'eth0', - 'mac_address': '00:11:22:33:44:55', - 'params': {'driver': 'hv_netsvc'}, - 'subnets': [{'type': 'dhcp'}], - }] + "version": 1, + "config": [ + { + "type": "physical", + "name": "eth0", + "mac_address": "00:11:22:33:44:55", + "params": {"driver": "hv_netsvc"}, + "subnets": [{"type": "dhcp"}], + } + ], } - @mock.patch('cloudinit.sources.DataSourceAzure.device_driver', - return_value=None) + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) def test_single_ipv4_nic_configuration(self, m_driver): """parse_network_config emits dhcp on single nic with ipv4""" - expected = {'ethernets': { - 'eth0': {'dhcp4': True, - 'dhcp4-overrides': {'route-metric': 100}, - 'dhcp6': False, - 'match': {'macaddress': '00:0d:3a:04:75:98'}, - 'set-name': 'eth0'}}, 'version': 2} + expected = { + "ethernets": { + "eth0": { + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 100}, + "dhcp6": False, + "match": {"macaddress": "00:0d:3a:04:75:98"}, + "set-name": "eth0", + } + }, + "version": 2, + } self.assertEqual(expected, dsaz.parse_network_config(NETWORK_METADATA)) - @mock.patch('cloudinit.sources.DataSourceAzure.device_driver', - return_value=None) + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) def test_increases_route_metric_for_non_primary_nics(self, m_driver): """parse_network_config increases route-metric for each nic""" - expected = {'ethernets': { - 'eth0': {'dhcp4': True, - 'dhcp4-overrides': {'route-metric': 100}, - 'dhcp6': False, - 'match': {'macaddress': '00:0d:3a:04:75:98'}, - 'set-name': 'eth0'}, - 'eth1': {'set-name': 'eth1', - 'match': {'macaddress': '22:0d:3a:04:75:98'}, - 'dhcp6': False, - 'dhcp4': True, - 'dhcp4-overrides': {'route-metric': 200}}, - 'eth2': {'set-name': 'eth2', - 'match': {'macaddress': '33:0d:3a:04:75:98'}, - 'dhcp6': False, - 'dhcp4': True, - 'dhcp4-overrides': {'route-metric': 300}}}, 'version': 2} + expected = { + "ethernets": { + "eth0": { + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 100}, + "dhcp6": False, + "match": {"macaddress": "00:0d:3a:04:75:98"}, + "set-name": "eth0", + }, + "eth1": { + "set-name": "eth1", + "match": {"macaddress": "22:0d:3a:04:75:98"}, + "dhcp6": False, + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 200}, + }, + "eth2": { + "set-name": "eth2", + "match": {"macaddress": "33:0d:3a:04:75:98"}, + "dhcp6": False, + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 300}, + }, + }, + "version": 2, + } imds_data = copy.deepcopy(NETWORK_METADATA) - imds_data['network']['interface'].append(SECONDARY_INTERFACE) + imds_data["network"]["interface"].append(SECONDARY_INTERFACE) third_intf = copy.deepcopy(SECONDARY_INTERFACE) - third_intf['macAddress'] = third_intf['macAddress'].replace('22', '33') - third_intf['ipv4']['subnet'][0]['address'] = '10.0.2.0' - third_intf['ipv4']['ipAddress'][0]['privateIpAddress'] = '10.0.2.6' - imds_data['network']['interface'].append(third_intf) + third_intf["macAddress"] = third_intf["macAddress"].replace("22", "33") + third_intf["ipv4"]["subnet"][0]["address"] = "10.0.2.0" + third_intf["ipv4"]["ipAddress"][0]["privateIpAddress"] = "10.0.2.6" + imds_data["network"]["interface"].append(third_intf) self.assertEqual(expected, dsaz.parse_network_config(imds_data)) - @mock.patch('cloudinit.sources.DataSourceAzure.device_driver', - return_value=None) + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) def test_ipv4_and_ipv6_route_metrics_match_for_nics(self, m_driver): """parse_network_config emits matching ipv4 and ipv6 route-metrics.""" - expected = {'ethernets': { - 'eth0': {'addresses': ['10.0.0.5/24', '2001:dead:beef::2/128'], - 'dhcp4': True, - 'dhcp4-overrides': {'route-metric': 100}, - 'dhcp6': True, - 'dhcp6-overrides': {'route-metric': 100}, - 'match': {'macaddress': '00:0d:3a:04:75:98'}, - 'set-name': 'eth0'}, - 'eth1': {'set-name': 'eth1', - 'match': {'macaddress': '22:0d:3a:04:75:98'}, - 'dhcp4': True, - 'dhcp6': False, - 'dhcp4-overrides': {'route-metric': 200}}, - 'eth2': {'set-name': 'eth2', - 'match': {'macaddress': '33:0d:3a:04:75:98'}, - 'dhcp4': True, - 'dhcp4-overrides': {'route-metric': 300}, - 'dhcp6': True, - 'dhcp6-overrides': {'route-metric': 300}}}, 'version': 2} + expected = { + "ethernets": { + "eth0": { + "addresses": ["10.0.0.5/24", "2001:dead:beef::2/128"], + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 100}, + "dhcp6": True, + "dhcp6-overrides": {"route-metric": 100}, + "match": {"macaddress": "00:0d:3a:04:75:98"}, + "set-name": "eth0", + }, + "eth1": { + "set-name": "eth1", + "match": {"macaddress": "22:0d:3a:04:75:98"}, + "dhcp4": True, + "dhcp6": False, + "dhcp4-overrides": {"route-metric": 200}, + }, + "eth2": { + "set-name": "eth2", + "match": {"macaddress": "33:0d:3a:04:75:98"}, + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 300}, + "dhcp6": True, + "dhcp6-overrides": {"route-metric": 300}, + }, + }, + "version": 2, + } imds_data = copy.deepcopy(NETWORK_METADATA) - nic1 = imds_data['network']['interface'][0] - nic1['ipv4']['ipAddress'].append({'privateIpAddress': '10.0.0.5'}) + nic1 = imds_data["network"]["interface"][0] + nic1["ipv4"]["ipAddress"].append({"privateIpAddress": "10.0.0.5"}) - nic1['ipv6'] = { + nic1["ipv6"] = { "subnet": [{"address": "2001:dead:beef::16"}], - "ipAddress": [{"privateIpAddress": "2001:dead:beef::1"}, - {"privateIpAddress": "2001:dead:beef::2"}] + "ipAddress": [ + {"privateIpAddress": "2001:dead:beef::1"}, + {"privateIpAddress": "2001:dead:beef::2"}, + ], } - imds_data['network']['interface'].append(SECONDARY_INTERFACE) + imds_data["network"]["interface"].append(SECONDARY_INTERFACE) third_intf = copy.deepcopy(SECONDARY_INTERFACE) - third_intf['macAddress'] = third_intf['macAddress'].replace('22', '33') - third_intf['ipv4']['subnet'][0]['address'] = '10.0.2.0' - third_intf['ipv4']['ipAddress'][0]['privateIpAddress'] = '10.0.2.6' - third_intf['ipv6'] = { + third_intf["macAddress"] = third_intf["macAddress"].replace("22", "33") + third_intf["ipv4"]["subnet"][0]["address"] = "10.0.2.0" + third_intf["ipv4"]["ipAddress"][0]["privateIpAddress"] = "10.0.2.6" + third_intf["ipv6"] = { "subnet": [{"prefix": "64", "address": "2001:dead:beef::2"}], - "ipAddress": [{"privateIpAddress": "2001:dead:beef::1"}] + "ipAddress": [{"privateIpAddress": "2001:dead:beef::1"}], } - imds_data['network']['interface'].append(third_intf) + imds_data["network"]["interface"].append(third_intf) self.assertEqual(expected, dsaz.parse_network_config(imds_data)) - @mock.patch('cloudinit.sources.DataSourceAzure.device_driver', - return_value=None) + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) def test_ipv4_secondary_ips_will_be_static_addrs(self, m_driver): """parse_network_config emits primary ipv4 as dhcp others are static""" - expected = {'ethernets': { - 'eth0': {'addresses': ['10.0.0.5/24'], - 'dhcp4': True, - 'dhcp4-overrides': {'route-metric': 100}, - 'dhcp6': True, - 'dhcp6-overrides': {'route-metric': 100}, - 'match': {'macaddress': '00:0d:3a:04:75:98'}, - 'set-name': 'eth0'}}, 'version': 2} + expected = { + "ethernets": { + "eth0": { + "addresses": ["10.0.0.5/24"], + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 100}, + "dhcp6": True, + "dhcp6-overrides": {"route-metric": 100}, + "match": {"macaddress": "00:0d:3a:04:75:98"}, + "set-name": "eth0", + } + }, + "version": 2, + } imds_data = copy.deepcopy(NETWORK_METADATA) - nic1 = imds_data['network']['interface'][0] - nic1['ipv4']['ipAddress'].append({'privateIpAddress': '10.0.0.5'}) + nic1 = imds_data["network"]["interface"][0] + nic1["ipv4"]["ipAddress"].append({"privateIpAddress": "10.0.0.5"}) - nic1['ipv6'] = { + nic1["ipv6"] = { "subnet": [{"prefix": "10", "address": "2001:dead:beef::16"}], - "ipAddress": [{"privateIpAddress": "2001:dead:beef::1"}] + "ipAddress": [{"privateIpAddress": "2001:dead:beef::1"}], } self.assertEqual(expected, dsaz.parse_network_config(imds_data)) - @mock.patch('cloudinit.sources.DataSourceAzure.device_driver', - return_value=None) + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) def test_ipv6_secondary_ips_will_be_static_cidrs(self, m_driver): """parse_network_config emits primary ipv6 as dhcp others are static""" - expected = {'ethernets': { - 'eth0': {'addresses': ['10.0.0.5/24', '2001:dead:beef::2/10'], - 'dhcp4': True, - 'dhcp4-overrides': {'route-metric': 100}, - 'dhcp6': True, - 'dhcp6-overrides': {'route-metric': 100}, - 'match': {'macaddress': '00:0d:3a:04:75:98'}, - 'set-name': 'eth0'}}, 'version': 2} + expected = { + "ethernets": { + "eth0": { + "addresses": ["10.0.0.5/24", "2001:dead:beef::2/10"], + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 100}, + "dhcp6": True, + "dhcp6-overrides": {"route-metric": 100}, + "match": {"macaddress": "00:0d:3a:04:75:98"}, + "set-name": "eth0", + } + }, + "version": 2, + } imds_data = copy.deepcopy(NETWORK_METADATA) - nic1 = imds_data['network']['interface'][0] - nic1['ipv4']['ipAddress'].append({'privateIpAddress': '10.0.0.5'}) + nic1 = imds_data["network"]["interface"][0] + nic1["ipv4"]["ipAddress"].append({"privateIpAddress": "10.0.0.5"}) # Secondary ipv6 addresses currently ignored/unconfigured - nic1['ipv6'] = { + nic1["ipv6"] = { "subnet": [{"prefix": "10", "address": "2001:dead:beef::16"}], - "ipAddress": [{"privateIpAddress": "2001:dead:beef::1"}, - {"privateIpAddress": "2001:dead:beef::2"}] + "ipAddress": [ + {"privateIpAddress": "2001:dead:beef::1"}, + {"privateIpAddress": "2001:dead:beef::2"}, + ], } self.assertEqual(expected, dsaz.parse_network_config(imds_data)) - @mock.patch('cloudinit.sources.DataSourceAzure.device_driver', - return_value='hv_netvsc') + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", + return_value="hv_netvsc", + ) def test_match_driver_for_netvsc(self, m_driver): """parse_network_config emits driver when using netvsc.""" - expected = {'ethernets': { - 'eth0': { - 'dhcp4': True, - 'dhcp4-overrides': {'route-metric': 100}, - 'dhcp6': False, - 'match': { - 'macaddress': '00:0d:3a:04:75:98', - 'driver': 'hv_netvsc', - }, - 'set-name': 'eth0' - }}, 'version': 2} + expected = { + "ethernets": { + "eth0": { + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 100}, + "dhcp6": False, + "match": { + "macaddress": "00:0d:3a:04:75:98", + "driver": "hv_netvsc", + }, + "set-name": "eth0", + } + }, + "version": 2, + } self.assertEqual(expected, dsaz.parse_network_config(NETWORK_METADATA)) - @mock.patch('cloudinit.sources.DataSourceAzure.device_driver', - return_value=None) - @mock.patch('cloudinit.net.generate_fallback_config') + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) + @mock.patch("cloudinit.net.generate_fallback_config") def test_parse_network_config_uses_fallback_cfg_when_no_network_metadata( - self, m_fallback_config, m_driver): + self, m_fallback_config, m_driver + ): """parse_network_config generates fallback network config when the IMDS instance metadata is corrupted/invalid, such as when network metadata is not present. """ imds_metadata_missing_network_metadata = copy.deepcopy( - NETWORK_METADATA) - del imds_metadata_missing_network_metadata['network'] + NETWORK_METADATA + ) + del imds_metadata_missing_network_metadata["network"] m_fallback_config.return_value = self.fallback_config self.assertEqual( self.fallback_config, - dsaz.parse_network_config( - imds_metadata_missing_network_metadata)) + dsaz.parse_network_config(imds_metadata_missing_network_metadata), + ) - @mock.patch('cloudinit.sources.DataSourceAzure.device_driver', - return_value=None) - @mock.patch('cloudinit.net.generate_fallback_config') + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) + @mock.patch("cloudinit.net.generate_fallback_config") def test_parse_network_config_uses_fallback_cfg_when_no_interface_metadata( - self, m_fallback_config, m_driver): + self, m_fallback_config, m_driver + ): """parse_network_config generates fallback network config when the IMDS instance metadata is corrupted/invalid, such as when network interface metadata is not present. """ imds_metadata_missing_interface_metadata = copy.deepcopy( - NETWORK_METADATA) - del imds_metadata_missing_interface_metadata['network']['interface'] + NETWORK_METADATA + ) + del imds_metadata_missing_interface_metadata["network"]["interface"] m_fallback_config.return_value = self.fallback_config self.assertEqual( self.fallback_config, dsaz.parse_network_config( - imds_metadata_missing_interface_metadata)) + imds_metadata_missing_interface_metadata + ), + ) class TestGetMetadataFromIMDS(HttprettyTestCase): @@ -412,175 +464,218 @@ class TestGetMetadataFromIMDS(HttprettyTestCase): dsaz.IMDS_URL ) - @mock.patch(MOCKPATH + 'readurl') - @mock.patch(MOCKPATH + 'EphemeralDHCPv4', autospec=True) - @mock.patch(MOCKPATH + 'net.is_up', autospec=True) + @mock.patch(MOCKPATH + "readurl") + @mock.patch(MOCKPATH + "EphemeralDHCPv4", autospec=True) + @mock.patch(MOCKPATH + "net.is_up", autospec=True) def test_get_metadata_does_not_dhcp_if_network_is_up( - self, m_net_is_up, m_dhcp, m_readurl): + self, m_net_is_up, m_dhcp, m_readurl + ): """Do not perform DHCP setup when nic is already up.""" m_net_is_up.return_value = True m_readurl.return_value = url_helper.StringResponse( - json.dumps(NETWORK_METADATA).encode('utf-8')) + json.dumps(NETWORK_METADATA).encode("utf-8") + ) self.assertEqual( - NETWORK_METADATA, - dsaz.get_metadata_from_imds('eth9', retries=3)) + NETWORK_METADATA, dsaz.get_metadata_from_imds("eth9", retries=3) + ) - m_net_is_up.assert_called_with('eth9') + m_net_is_up.assert_called_with("eth9") m_dhcp.assert_not_called() self.assertIn( "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time - self.logs.getvalue()) + self.logs.getvalue(), + ) - @mock.patch(MOCKPATH + 'readurl', autospec=True) - @mock.patch(MOCKPATH + 'EphemeralDHCPv4') - @mock.patch(MOCKPATH + 'net.is_up') + @mock.patch(MOCKPATH + "readurl", autospec=True) + @mock.patch(MOCKPATH + "EphemeralDHCPv4") + @mock.patch(MOCKPATH + "net.is_up") def test_get_metadata_uses_instance_url( - self, m_net_is_up, m_dhcp, m_readurl): + self, m_net_is_up, m_dhcp, m_readurl + ): """Make sure readurl is called with the correct url when accessing metadata""" m_net_is_up.return_value = True m_readurl.return_value = url_helper.StringResponse( - json.dumps(IMDS_NETWORK_METADATA).encode('utf-8')) + json.dumps(IMDS_NETWORK_METADATA).encode("utf-8") + ) dsaz.get_metadata_from_imds( - 'eth0', retries=3, md_type=dsaz.metadata_type.all) + "eth0", retries=3, md_type=dsaz.metadata_type.all + ) m_readurl.assert_called_with( - "http://169.254.169.254/metadata/instance?api-version=" - "2019-06-01", exception_cb=mock.ANY, - headers=mock.ANY, retries=mock.ANY, - timeout=mock.ANY, infinite=False) + "http://169.254.169.254/metadata/instance?api-version=2019-06-01", + exception_cb=mock.ANY, + headers=mock.ANY, + retries=mock.ANY, + timeout=mock.ANY, + infinite=False, + ) - @mock.patch(MOCKPATH + 'readurl', autospec=True) - @mock.patch(MOCKPATH + 'EphemeralDHCPv4') - @mock.patch(MOCKPATH + 'net.is_up') + @mock.patch(MOCKPATH + "readurl", autospec=True) + @mock.patch(MOCKPATH + "EphemeralDHCPv4") + @mock.patch(MOCKPATH + "net.is_up") def test_get_network_metadata_uses_network_url( - self, m_net_is_up, m_dhcp, m_readurl): + self, m_net_is_up, m_dhcp, m_readurl + ): """Make sure readurl is called with the correct url when accessing network metadata""" m_net_is_up.return_value = True m_readurl.return_value = url_helper.StringResponse( - json.dumps(IMDS_NETWORK_METADATA).encode('utf-8')) + json.dumps(IMDS_NETWORK_METADATA).encode("utf-8") + ) dsaz.get_metadata_from_imds( - 'eth0', retries=3, md_type=dsaz.metadata_type.network) + "eth0", retries=3, md_type=dsaz.metadata_type.network + ) m_readurl.assert_called_with( "http://169.254.169.254/metadata/instance/network?api-version=" - "2019-06-01", exception_cb=mock.ANY, - headers=mock.ANY, retries=mock.ANY, - timeout=mock.ANY, infinite=False) + "2019-06-01", + exception_cb=mock.ANY, + headers=mock.ANY, + retries=mock.ANY, + timeout=mock.ANY, + infinite=False, + ) - @mock.patch(MOCKPATH + 'readurl', autospec=True) - @mock.patch(MOCKPATH + 'EphemeralDHCPv4') - @mock.patch(MOCKPATH + 'net.is_up') + @mock.patch(MOCKPATH + "readurl", autospec=True) + @mock.patch(MOCKPATH + "EphemeralDHCPv4") + @mock.patch(MOCKPATH + "net.is_up") def test_get_default_metadata_uses_instance_url( - self, m_net_is_up, m_dhcp, m_readurl): + self, m_net_is_up, m_dhcp, m_readurl + ): """Make sure readurl is called with the correct url when accessing metadata""" m_net_is_up.return_value = True m_readurl.return_value = url_helper.StringResponse( - json.dumps(IMDS_NETWORK_METADATA).encode('utf-8')) + json.dumps(IMDS_NETWORK_METADATA).encode("utf-8") + ) - dsaz.get_metadata_from_imds( - 'eth0', retries=3) + dsaz.get_metadata_from_imds("eth0", retries=3) m_readurl.assert_called_with( - "http://169.254.169.254/metadata/instance?api-version=" - "2019-06-01", exception_cb=mock.ANY, - headers=mock.ANY, retries=mock.ANY, - timeout=mock.ANY, infinite=False) + "http://169.254.169.254/metadata/instance?api-version=2019-06-01", + exception_cb=mock.ANY, + headers=mock.ANY, + retries=mock.ANY, + timeout=mock.ANY, + infinite=False, + ) - @mock.patch(MOCKPATH + 'readurl', autospec=True) - @mock.patch(MOCKPATH + 'EphemeralDHCPv4') - @mock.patch(MOCKPATH + 'net.is_up') + @mock.patch(MOCKPATH + "readurl", autospec=True) + @mock.patch(MOCKPATH + "EphemeralDHCPv4") + @mock.patch(MOCKPATH + "net.is_up") def test_get_metadata_uses_extended_url( - self, m_net_is_up, m_dhcp, m_readurl): + self, m_net_is_up, m_dhcp, m_readurl + ): """Make sure readurl is called with the correct url when accessing metadata""" m_net_is_up.return_value = True m_readurl.return_value = url_helper.StringResponse( - json.dumps(IMDS_NETWORK_METADATA).encode('utf-8')) + json.dumps(IMDS_NETWORK_METADATA).encode("utf-8") + ) dsaz.get_metadata_from_imds( - 'eth0', retries=3, md_type=dsaz.metadata_type.all, - api_version="2021-08-01") + "eth0", + retries=3, + md_type=dsaz.metadata_type.all, + api_version="2021-08-01", + ) m_readurl.assert_called_with( "http://169.254.169.254/metadata/instance?api-version=" - "2021-08-01&extended=true", exception_cb=mock.ANY, - headers=mock.ANY, retries=mock.ANY, - timeout=mock.ANY, infinite=False) + "2021-08-01&extended=true", + exception_cb=mock.ANY, + headers=mock.ANY, + retries=mock.ANY, + timeout=mock.ANY, + infinite=False, + ) - @mock.patch(MOCKPATH + 'readurl', autospec=True) - @mock.patch(MOCKPATH + 'EphemeralDHCPv4WithReporting', autospec=True) - @mock.patch(MOCKPATH + 'net.is_up', autospec=True) + @mock.patch(MOCKPATH + "readurl", autospec=True) + @mock.patch(MOCKPATH + "EphemeralDHCPv4WithReporting", autospec=True) + @mock.patch(MOCKPATH + "net.is_up", autospec=True) def test_get_metadata_performs_dhcp_when_network_is_down( - self, m_net_is_up, m_dhcp, m_readurl): + self, m_net_is_up, m_dhcp, m_readurl + ): """Perform DHCP setup when nic is not up.""" m_net_is_up.return_value = False m_readurl.return_value = url_helper.StringResponse( - json.dumps(NETWORK_METADATA).encode('utf-8')) + json.dumps(NETWORK_METADATA).encode("utf-8") + ) self.assertEqual( - NETWORK_METADATA, - dsaz.get_metadata_from_imds('eth9', retries=2)) + NETWORK_METADATA, dsaz.get_metadata_from_imds("eth9", retries=2) + ) - m_net_is_up.assert_called_with('eth9') - m_dhcp.assert_called_with(mock.ANY, 'eth9') + m_net_is_up.assert_called_with("eth9") + m_dhcp.assert_called_with(mock.ANY, "eth9") self.assertIn( "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time - self.logs.getvalue()) + self.logs.getvalue(), + ) m_readurl.assert_called_with( - self.network_md_url, exception_cb=mock.ANY, - headers={'Metadata': 'true'}, retries=2, - timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS, infinite=False) + self.network_md_url, + exception_cb=mock.ANY, + headers={"Metadata": "true"}, + retries=2, + timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS, + infinite=False, + ) - @mock.patch('cloudinit.url_helper.time.sleep') - @mock.patch(MOCKPATH + 'net.is_up', autospec=True) + @mock.patch("cloudinit.url_helper.time.sleep") + @mock.patch(MOCKPATH + "net.is_up", autospec=True) def test_get_metadata_from_imds_empty_when_no_imds_present( - self, m_net_is_up, m_sleep): + self, m_net_is_up, m_sleep + ): """Return empty dict when IMDS network metadata is absent.""" httpretty.register_uri( httpretty.GET, - dsaz.IMDS_URL + '/instance?api-version=2017-12-01', - body={}, status=404) + dsaz.IMDS_URL + "/instance?api-version=2017-12-01", + body={}, + status=404, + ) m_net_is_up.return_value = True # skips dhcp - self.assertEqual({}, dsaz.get_metadata_from_imds('eth9', retries=2)) + self.assertEqual({}, dsaz.get_metadata_from_imds("eth9", retries=2)) - m_net_is_up.assert_called_with('eth9') + m_net_is_up.assert_called_with("eth9") self.assertEqual([mock.call(1), mock.call(1)], m_sleep.call_args_list) self.assertIn( "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time - self.logs.getvalue()) + self.logs.getvalue(), + ) - @mock.patch('requests.Session.request') - @mock.patch('cloudinit.url_helper.time.sleep') - @mock.patch(MOCKPATH + 'net.is_up', autospec=True) + @mock.patch("requests.Session.request") + @mock.patch("cloudinit.url_helper.time.sleep") + @mock.patch(MOCKPATH + "net.is_up", autospec=True) def test_get_metadata_from_imds_retries_on_timeout( - self, m_net_is_up, m_sleep, m_request): + self, m_net_is_up, m_sleep, m_request + ): """Retry IMDS network metadata on timeout errors.""" self.attempt = 0 - m_request.side_effect = requests.Timeout('Fake Connection Timeout') + m_request.side_effect = requests.Timeout("Fake Connection Timeout") def retry_callback(request, uri, headers): self.attempt += 1 - raise requests.Timeout('Fake connection timeout') + raise requests.Timeout("Fake connection timeout") httpretty.register_uri( httpretty.GET, - dsaz.IMDS_URL + 'instance?api-version=2017-12-01', - body=retry_callback) + dsaz.IMDS_URL + "instance?api-version=2017-12-01", + body=retry_callback, + ) m_net_is_up.return_value = True # skips dhcp - self.assertEqual({}, dsaz.get_metadata_from_imds('eth9', retries=3)) + self.assertEqual({}, dsaz.get_metadata_from_imds("eth9", retries=3)) - m_net_is_up.assert_called_with('eth9') - self.assertEqual([mock.call(1)]*3, m_sleep.call_args_list) + m_net_is_up.assert_called_with("eth9") + self.assertEqual([mock.call(1)] * 3, m_sleep.call_args_list) self.assertIn( "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time - self.logs.getvalue()) + self.logs.getvalue(), + ) class TestAzureDataSource(CiTestCase): @@ -593,25 +688,35 @@ class TestAzureDataSource(CiTestCase): # patch cloud_dir, so our 'seed_dir' is guaranteed empty self.paths = helpers.Paths( - {'cloud_dir': self.tmp, 'run_dir': self.tmp}) - self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent') + {"cloud_dir": self.tmp, "run_dir": self.tmp} + ) + self.waagent_d = os.path.join(self.tmp, "var", "lib", "waagent") self.patches = ExitStack() self.addCleanup(self.patches.close) - self.patches.enter_context(mock.patch.object( - dsaz, '_get_random_seed', return_value='wild')) + self.patches.enter_context( + mock.patch.object(dsaz, "_get_random_seed", return_value="wild") + ) self.m_get_metadata_from_imds = self.patches.enter_context( mock.patch.object( - dsaz, 'get_metadata_from_imds', - mock.MagicMock(return_value=NETWORK_METADATA))) + dsaz, + "get_metadata_from_imds", + mock.MagicMock(return_value=NETWORK_METADATA), + ) + ) self.m_fallback_nic = self.patches.enter_context( - mock.patch('cloudinit.sources.net.find_fallback_nic', - return_value='eth9')) + mock.patch( + "cloudinit.sources.net.find_fallback_nic", return_value="eth9" + ) + ) self.m_remove_ubuntu_network_scripts = self.patches.enter_context( mock.patch.object( - dsaz, 'maybe_remove_ubuntu_network_config_scripts', - mock.MagicMock())) + dsaz, + "maybe_remove_ubuntu_network_config_scripts", + mock.MagicMock(), + ) + ) super(TestAzureDataSource, self).setUp() def apply_patches(self, patches): @@ -619,15 +724,21 @@ class TestAzureDataSource(CiTestCase): self.patches.enter_context(mock.patch.object(module, name, new)) def _get_mockds(self): - sysctl_out = "dev.storvsc.3.%pnpinfo: "\ - "classid=ba6163d9-04a1-4d29-b605-72e2ffb1dc7f "\ - "deviceid=f8b3781b-1e82-4818-a1c3-63d806ec15bb\n" - sysctl_out += "dev.storvsc.2.%pnpinfo: "\ - "classid=ba6163d9-04a1-4d29-b605-72e2ffb1dc7f "\ - "deviceid=f8b3781a-1e82-4818-a1c3-63d806ec15bb\n" - sysctl_out += "dev.storvsc.1.%pnpinfo: "\ - "classid=32412632-86cb-44a2-9b5c-50d1417354f5 "\ - "deviceid=00000000-0001-8899-0000-000000000000\n" + sysctl_out = ( + "dev.storvsc.3.%pnpinfo: " + "classid=ba6163d9-04a1-4d29-b605-72e2ffb1dc7f " + "deviceid=f8b3781b-1e82-4818-a1c3-63d806ec15bb\n" + ) + sysctl_out += ( + "dev.storvsc.2.%pnpinfo: " + "classid=ba6163d9-04a1-4d29-b605-72e2ffb1dc7f " + "deviceid=f8b3781a-1e82-4818-a1c3-63d806ec15bb\n" + ) + sysctl_out += ( + "dev.storvsc.1.%pnpinfo: " + "classid=32412632-86cb-44a2-9b5c-50d1417354f5 " + "deviceid=00000000-0001-8899-0000-000000000000\n" + ) camctl_devbus = """ scbus0 on ata0 bus 0 scbus1 on ata1 bus 0 @@ -642,45 +753,57 @@ scbus-1 on xpt0 bus 0 <Msft Virtual Disk 1.0> at scbus2 target 0 lun 0 (da0,pass1) <Msft Virtual Disk 1.0> at scbus3 target 1 lun 0 (da1,pass2) """ - self.apply_patches([ - (dsaz, 'get_dev_storvsc_sysctl', mock.MagicMock( - return_value=sysctl_out)), - (dsaz, 'get_camcontrol_dev_bus', mock.MagicMock( - return_value=camctl_devbus)), - (dsaz, 'get_camcontrol_dev', mock.MagicMock( - return_value=camctl_dev)) - ]) + self.apply_patches( + [ + ( + dsaz, + "get_dev_storvsc_sysctl", + mock.MagicMock(return_value=sysctl_out), + ), + ( + dsaz, + "get_camcontrol_dev_bus", + mock.MagicMock(return_value=camctl_devbus), + ), + ( + dsaz, + "get_camcontrol_dev", + mock.MagicMock(return_value=camctl_dev), + ), + ] + ) return dsaz - def _get_ds(self, data, distro='ubuntu', - apply_network=None, instance_id=None): - + def _get_ds( + self, data, distro="ubuntu", apply_network=None, instance_id=None + ): def _wait_for_files(flist, _maxwait=None, _naplen=None): - data['waited'] = flist + data["waited"] = flist return [] def _load_possible_azure_ds(seed_dir, cache_dir): yield seed_dir yield dsaz.DEFAULT_PROVISIONING_ISO_DEV - yield from data.get('dsdevs', []) + yield from data.get("dsdevs", []) if cache_dir: yield cache_dir seed_dir = os.path.join(self.paths.seed_dir, "azure") - if data.get('ovfcontent') is not None: - populate_dir(seed_dir, - {'ovf-env.xml': data['ovfcontent']}) + if data.get("ovfcontent") is not None: + populate_dir(seed_dir, {"ovf-env.xml": data["ovfcontent"]}) - dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d + dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d self.m_is_platform_viable = mock.MagicMock(autospec=True) self.m_get_metadata_from_fabric = mock.MagicMock( - return_value={'public-keys': []}) + return_value={"public-keys": []} + ) self.m_report_failure_to_fabric = mock.MagicMock(autospec=True) self.m_ephemeral_dhcpv4 = mock.MagicMock() self.m_ephemeral_dhcpv4_with_reporting = mock.MagicMock() self.m_list_possible_azure_ds = mock.MagicMock( - side_effect=_load_possible_azure_ds) + side_effect=_load_possible_azure_ds + ) if instance_id: self.instance_id = instance_id @@ -688,39 +811,59 @@ scbus-1 on xpt0 bus 0 self.instance_id = EXAMPLE_UUID def _dmi_mocks(key): - if key == 'system-uuid': + if key == "system-uuid": return self.instance_id - elif key == 'chassis-asset-tag': - return '7783-7084-3265-9085-8269-3286-77' - - self.apply_patches([ - (dsaz, 'list_possible_azure_ds', - self.m_list_possible_azure_ds), - (dsaz, '_is_platform_viable', - self.m_is_platform_viable), - (dsaz, 'get_metadata_from_fabric', - self.m_get_metadata_from_fabric), - (dsaz, 'report_failure_to_fabric', - self.m_report_failure_to_fabric), - (dsaz, 'EphemeralDHCPv4', self.m_ephemeral_dhcpv4), - (dsaz, 'EphemeralDHCPv4WithReporting', - self.m_ephemeral_dhcpv4_with_reporting), - (dsaz, 'get_boot_telemetry', mock.MagicMock()), - (dsaz, 'get_system_info', mock.MagicMock()), - (dsaz.subp, 'which', lambda x: True), - (dsaz.dmi, 'read_dmi_data', mock.MagicMock( - side_effect=_dmi_mocks)), - (dsaz.util, 'wait_for_files', mock.MagicMock( - side_effect=_wait_for_files)), - ]) + elif key == "chassis-asset-tag": + return "7783-7084-3265-9085-8269-3286-77" + + self.apply_patches( + [ + ( + dsaz, + "list_possible_azure_ds", + self.m_list_possible_azure_ds, + ), + (dsaz, "_is_platform_viable", self.m_is_platform_viable), + ( + dsaz, + "get_metadata_from_fabric", + self.m_get_metadata_from_fabric, + ), + ( + dsaz, + "report_failure_to_fabric", + self.m_report_failure_to_fabric, + ), + (dsaz, "EphemeralDHCPv4", self.m_ephemeral_dhcpv4), + ( + dsaz, + "EphemeralDHCPv4WithReporting", + self.m_ephemeral_dhcpv4_with_reporting, + ), + (dsaz, "get_boot_telemetry", mock.MagicMock()), + (dsaz, "get_system_info", mock.MagicMock()), + (dsaz.subp, "which", lambda x: True), + ( + dsaz.dmi, + "read_dmi_data", + mock.MagicMock(side_effect=_dmi_mocks), + ), + ( + dsaz.util, + "wait_for_files", + mock.MagicMock(side_effect=_wait_for_files), + ), + ] + ) if isinstance(distro, str): distro_cls = distros.fetch(distro) - distro = distro_cls(distro, data.get('sys_cfg', {}), self.paths) + distro = distro_cls(distro, data.get("sys_cfg", {}), self.paths) dsrc = dsaz.DataSourceAzure( - data.get('sys_cfg', {}), distro=distro, paths=self.paths) + data.get("sys_cfg", {}), distro=distro, paths=self.paths + ) if apply_network is not None: - dsrc.ds_cfg['apply_network_config'] = apply_network + dsrc.ds_cfg["apply_network_config"] = apply_network return dsrc @@ -774,19 +917,18 @@ scbus-1 on xpt0 bus 0 data = {} dsrc = self._get_ds(data) self.m_is_platform_viable.return_value = False - with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata, \ - mock.patch.object(dsrc, '_report_failure') as m_report_failure: + with mock.patch.object( + dsrc, "crawl_metadata" + ) as m_crawl_metadata, mock.patch.object( + dsrc, "_report_failure" + ) as m_report_failure: ret = dsrc.get_data() self.m_is_platform_viable.assert_called_with(dsrc.seed_dir) self.assertFalse(ret) # Assert that for non viable platforms, # there is no communication with the Azure datasource. - self.assertEqual( - 0, - m_crawl_metadata.call_count) - self.assertEqual( - 0, - m_report_failure.call_count) + self.assertEqual(0, m_crawl_metadata.call_count) + self.assertEqual(0, m_report_failure.call_count) def test_platform_viable_but_no_devs_should_return_no_datasource(self): """For platforms where the Azure platform is viable @@ -797,170 +939,190 @@ scbus-1 on xpt0 bus 0 """ data = {} dsrc = self._get_ds(data) - with mock.patch.object(dsrc, '_report_failure') as m_report_failure: + with mock.patch.object(dsrc, "_report_failure") as m_report_failure: self.m_is_platform_viable.return_value = True ret = dsrc.get_data() self.m_is_platform_viable.assert_called_with(dsrc.seed_dir) self.assertFalse(ret) - self.assertEqual( - 1, - m_report_failure.call_count) + self.assertEqual(1, m_report_failure.call_count) def test_crawl_metadata_exception_returns_no_datasource(self): data = {} dsrc = self._get_ds(data) self.m_is_platform_viable.return_value = True - with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata: + with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata: m_crawl_metadata.side_effect = Exception ret = dsrc.get_data() self.m_is_platform_viable.assert_called_with(dsrc.seed_dir) - self.assertEqual( - 1, - m_crawl_metadata.call_count) + self.assertEqual(1, m_crawl_metadata.call_count) self.assertFalse(ret) def test_crawl_metadata_exception_should_report_failure_with_msg(self): data = {} dsrc = self._get_ds(data) self.m_is_platform_viable.return_value = True - with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata, \ - mock.patch.object(dsrc, '_report_failure') as m_report_failure: + with mock.patch.object( + dsrc, "crawl_metadata" + ) as m_crawl_metadata, mock.patch.object( + dsrc, "_report_failure" + ) as m_report_failure: m_crawl_metadata.side_effect = Exception dsrc.get_data() - self.assertEqual( - 1, - m_crawl_metadata.call_count) + self.assertEqual(1, m_crawl_metadata.call_count) m_report_failure.assert_called_once_with( - description=dsaz.DEFAULT_REPORT_FAILURE_USER_VISIBLE_MESSAGE) + description=dsaz.DEFAULT_REPORT_FAILURE_USER_VISIBLE_MESSAGE + ) def test_crawl_metadata_exc_should_log_could_not_crawl_msg(self): data = {} dsrc = self._get_ds(data) self.m_is_platform_viable.return_value = True - with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata: + with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata: m_crawl_metadata.side_effect = Exception dsrc.get_data() - self.assertEqual( - 1, - m_crawl_metadata.call_count) + self.assertEqual(1, m_crawl_metadata.call_count) self.assertIn( - "Could not crawl Azure metadata", - self.logs.getvalue()) + "Could not crawl Azure metadata", self.logs.getvalue() + ) def test_basic_seed_dir(self): - odata = {'HostName': "myhost", 'UserName': "myuser"} - data = {'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': {}} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": {}, + } dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) self.assertEqual(dsrc.userdata_raw, "") - self.assertEqual(dsrc.metadata['local-hostname'], odata['HostName']) - self.assertTrue(os.path.isfile( - os.path.join(self.waagent_d, 'ovf-env.xml'))) - self.assertEqual('azure', dsrc.cloud_name) - self.assertEqual('azure', dsrc.platform_type) + self.assertEqual(dsrc.metadata["local-hostname"], odata["HostName"]) + self.assertTrue( + os.path.isfile(os.path.join(self.waagent_d, "ovf-env.xml")) + ) + self.assertEqual("azure", dsrc.cloud_name) + self.assertEqual("azure", dsrc.platform_type) self.assertEqual( - 'seed-dir (%s/seed/azure)' % self.tmp, dsrc.subplatform) + "seed-dir (%s/seed/azure)" % self.tmp, dsrc.subplatform + ) def test_basic_dev_file(self): """When a device path is used, present that in subplatform.""" - data = {'sys_cfg': {}, 'dsdevs': ['/dev/cd0']} + data = {"sys_cfg": {}, "dsdevs": ["/dev/cd0"]} dsrc = self._get_ds(data) # DSAzure will attempt to mount /dev/sr0 first, which should # fail with mount error since the list of devices doesn't have # /dev/sr0 - with mock.patch(MOCKPATH + 'util.mount_cb') as m_mount_cb: + with mock.patch(MOCKPATH + "util.mount_cb") as m_mount_cb: m_mount_cb.side_effect = [ MountFailedError("fail"), - ({'local-hostname': 'me'}, 'ud', {'cfg': ''}, {}) + ({"local-hostname": "me"}, "ud", {"cfg": ""}, {}), ] self.assertTrue(dsrc.get_data()) - self.assertEqual(dsrc.userdata_raw, 'ud') - self.assertEqual(dsrc.metadata['local-hostname'], 'me') - self.assertEqual('azure', dsrc.cloud_name) - self.assertEqual('azure', dsrc.platform_type) - self.assertEqual('config-disk (/dev/cd0)', dsrc.subplatform) + self.assertEqual(dsrc.userdata_raw, "ud") + self.assertEqual(dsrc.metadata["local-hostname"], "me") + self.assertEqual("azure", dsrc.cloud_name) + self.assertEqual("azure", dsrc.platform_type) + self.assertEqual("config-disk (/dev/cd0)", dsrc.subplatform) def test_get_data_non_ubuntu_will_not_remove_network_scripts(self): """get_data on non-Ubuntu will not remove ubuntu net scripts.""" - odata = {'HostName': "myhost", 'UserName': "myuser"} - data = {'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': {}} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": {}, + } - dsrc = self._get_ds(data, distro='debian') + dsrc = self._get_ds(data, distro="debian") dsrc.get_data() self.m_remove_ubuntu_network_scripts.assert_not_called() def test_get_data_on_ubuntu_will_remove_network_scripts(self): """get_data will remove ubuntu net scripts on Ubuntu distro.""" - sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}} - odata = {'HostName': "myhost", 'UserName': "myuser"} - data = {'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': sys_cfg} + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } - dsrc = self._get_ds(data, distro='ubuntu') + dsrc = self._get_ds(data, distro="ubuntu") dsrc.get_data() self.m_remove_ubuntu_network_scripts.assert_called_once_with() def test_get_data_on_ubuntu_will_not_remove_network_scripts_disabled(self): """When apply_network_config false, do not remove scripts on Ubuntu.""" - sys_cfg = {'datasource': {'Azure': {'apply_network_config': False}}} - odata = {'HostName': "myhost", 'UserName': "myuser"} - data = {'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': sys_cfg} + sys_cfg = {"datasource": {"Azure": {"apply_network_config": False}}} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } - dsrc = self._get_ds(data, distro='ubuntu') + dsrc = self._get_ds(data, distro="ubuntu") dsrc.get_data() self.m_remove_ubuntu_network_scripts.assert_not_called() def test_crawl_metadata_returns_structured_data_and_caches_nothing(self): """Return all structured metadata and cache no class attributes.""" yaml_cfg = "" - odata = {'HostName': "myhost", 'UserName': "myuser", - 'UserData': {'text': 'FOOBAR', 'encoding': 'plain'}, - 'dscfg': {'text': yaml_cfg, 'encoding': 'plain'}} - data = {'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': {}} + odata = { + "HostName": "myhost", + "UserName": "myuser", + "UserData": {"text": "FOOBAR", "encoding": "plain"}, + "dscfg": {"text": yaml_cfg, "encoding": "plain"}, + } + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": {}, + } dsrc = self._get_ds(data) expected_cfg = { - 'PreprovisionedVMType': None, - 'PreprovisionedVm': False, - 'datasource': {'Azure': {}}, - 'system_info': {'default_user': {'name': 'myuser'}}} + "PreprovisionedVMType": None, + "PreprovisionedVm": False, + "datasource": {"Azure": {}}, + "system_info": {"default_user": {"name": "myuser"}}, + } expected_metadata = { - 'azure_data': { - 'configurationsettype': 'LinuxProvisioningConfiguration'}, - 'imds': NETWORK_METADATA, - 'instance-id': EXAMPLE_UUID, - 'local-hostname': 'myhost', - 'random_seed': 'wild'} + "azure_data": { + "configurationsettype": "LinuxProvisioningConfiguration" + }, + "imds": NETWORK_METADATA, + "instance-id": EXAMPLE_UUID, + "local-hostname": "myhost", + "random_seed": "wild", + } crawled_metadata = dsrc.crawl_metadata() self.assertCountEqual( crawled_metadata.keys(), - ['cfg', 'files', 'metadata', 'userdata_raw']) - self.assertEqual(crawled_metadata['cfg'], expected_cfg) + ["cfg", "files", "metadata", "userdata_raw"], + ) + self.assertEqual(crawled_metadata["cfg"], expected_cfg) self.assertEqual( - list(crawled_metadata['files'].keys()), ['ovf-env.xml']) + list(crawled_metadata["files"].keys()), ["ovf-env.xml"] + ) self.assertIn( - b'<HostName>myhost</HostName>', - crawled_metadata['files']['ovf-env.xml']) - self.assertEqual(crawled_metadata['metadata'], expected_metadata) - self.assertEqual(crawled_metadata['userdata_raw'], 'FOOBAR') + b"<HostName>myhost</HostName>", + crawled_metadata["files"]["ovf-env.xml"], + ) + self.assertEqual(crawled_metadata["metadata"], expected_metadata) + self.assertEqual(crawled_metadata["userdata_raw"], "FOOBAR") self.assertEqual(dsrc.userdata_raw, None) self.assertEqual(dsrc.metadata, {}) self.assertEqual(dsrc._metadata_imds, UNSET) - self.assertFalse(os.path.isfile( - os.path.join(self.waagent_d, 'ovf-env.xml'))) + self.assertFalse( + os.path.isfile(os.path.join(self.waagent_d, "ovf-env.xml")) + ) def test_crawl_metadata_raises_invalid_metadata_on_error(self): """crawl_metadata raises an exception on invalid ovf-env.xml.""" - data = {'ovfcontent': "BOGUS", 'sys_cfg': {}} + data = {"ovfcontent": "BOGUS", "sys_cfg": {}} dsrc = self._get_ds(data) - error_msg = ('BrokenAzureDataSource: Invalid ovf-env.xml:' - ' syntax error: line 1, column 0') + error_msg = ( + "BrokenAzureDataSource: Invalid ovf-env.xml:" + " syntax error: line 1, column 0" + ) with self.assertRaises(InvalidMetaDataException) as cm: dsrc.crawl_metadata() self.assertEqual(str(cm.exception), error_msg) @@ -971,20 +1133,19 @@ scbus-1 on xpt0 bus 0 platform_settings={"PreprovisionedVm": "False"} ) - data = { - 'ovfcontent': ovfenv, - 'sys_cfg': {} - } + data = {"ovfcontent": ovfenv, "sys_cfg": {}} dsrc = self._get_ds(data) dsrc.crawl_metadata() self.assertEqual(1, self.m_get_metadata_from_imds.call_count) @mock.patch( - 'cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting') - @mock.patch('cloudinit.sources.DataSourceAzure.util.write_file') + "cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting" + ) + @mock.patch("cloudinit.sources.DataSourceAzure.util.write_file") @mock.patch( - 'cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready') - @mock.patch('cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds') + "cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready" + ) + @mock.patch("cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds") def test_crawl_metadata_call_imds_twice_with_reprovision( self, poll_imds_func, m_report_ready, m_write, m_dhcp ): @@ -993,21 +1154,20 @@ scbus-1 on xpt0 bus 0 platform_settings={"PreprovisionedVm": "True"} ) - data = { - 'ovfcontent': ovfenv, - 'sys_cfg': {} - } + data = {"ovfcontent": ovfenv, "sys_cfg": {}} dsrc = self._get_ds(data) poll_imds_func.return_value = ovfenv dsrc.crawl_metadata() self.assertEqual(2, self.m_get_metadata_from_imds.call_count) @mock.patch( - 'cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting') - @mock.patch('cloudinit.sources.DataSourceAzure.util.write_file') + "cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting" + ) + @mock.patch("cloudinit.sources.DataSourceAzure.util.write_file") @mock.patch( - 'cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready') - @mock.patch('cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds') + "cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready" + ) + @mock.patch("cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds") def test_crawl_metadata_on_reprovision_reports_ready( self, poll_imds_func, m_report_ready, m_write, m_dhcp ): @@ -1016,37 +1176,36 @@ scbus-1 on xpt0 bus 0 platform_settings={"PreprovisionedVm": "True"} ) - data = { - 'ovfcontent': ovfenv, - 'sys_cfg': {} - } + data = {"ovfcontent": ovfenv, "sys_cfg": {}} dsrc = self._get_ds(data) poll_imds_func.return_value = ovfenv dsrc.crawl_metadata() self.assertEqual(1, m_report_ready.call_count) @mock.patch( - 'cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting') - @mock.patch('cloudinit.sources.DataSourceAzure.util.write_file') + "cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting" + ) + @mock.patch("cloudinit.sources.DataSourceAzure.util.write_file") @mock.patch( - 'cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready') - @mock.patch('cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds') + "cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready" + ) + @mock.patch("cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds") @mock.patch( - 'cloudinit.sources.DataSourceAzure.DataSourceAzure.' - '_wait_for_all_nics_ready') + "cloudinit.sources.DataSourceAzure.DataSourceAzure." + "_wait_for_all_nics_ready" + ) def test_crawl_metadata_waits_for_nic_on_savable_vms( self, detect_nics, poll_imds_func, report_ready_func, m_write, m_dhcp ): """If reprovisioning, report ready at the end""" ovfenv = construct_valid_ovf_env( - platform_settings={"PreprovisionedVMType": "Savable", - "PreprovisionedVm": "True"} + platform_settings={ + "PreprovisionedVMType": "Savable", + "PreprovisionedVm": "True", + } ) - data = { - 'ovfcontent': ovfenv, - 'sys_cfg': {} - } + data = {"ovfcontent": ovfenv, "sys_cfg": {}} dsrc = self._get_ds(data) poll_imds_func.return_value = ovfenv dsrc.crawl_metadata() @@ -1054,18 +1213,27 @@ scbus-1 on xpt0 bus 0 self.assertEqual(1, detect_nics.call_count) @mock.patch( - 'cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting') - @mock.patch('cloudinit.sources.DataSourceAzure.util.write_file') + "cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting" + ) + @mock.patch("cloudinit.sources.DataSourceAzure.util.write_file") @mock.patch( - 'cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready') - @mock.patch('cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds') + "cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready" + ) + @mock.patch("cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds") @mock.patch( - 'cloudinit.sources.DataSourceAzure.DataSourceAzure.' - '_wait_for_all_nics_ready') - @mock.patch('os.path.isfile') + "cloudinit.sources.DataSourceAzure.DataSourceAzure." + "_wait_for_all_nics_ready" + ) + @mock.patch("os.path.isfile") def test_detect_nics_when_marker_present( - self, is_file, detect_nics, poll_imds_func, report_ready_func, m_write, - m_dhcp): + self, + is_file, + detect_nics, + poll_imds_func, + report_ready_func, + m_write, + m_dhcp, + ): """If reprovisioning, wait for nic attach if marker present""" def is_file_ret(key): @@ -1074,10 +1242,7 @@ scbus-1 on xpt0 bus 0 is_file.side_effect = is_file_ret ovfenv = construct_valid_ovf_env() - data = { - 'ovfcontent': ovfenv, - 'sys_cfg': {} - } + data = {"ovfcontent": ovfenv, "sys_cfg": {}} dsrc = self._get_ds(data) poll_imds_func.return_value = ovfenv @@ -1085,29 +1250,28 @@ scbus-1 on xpt0 bus 0 self.assertEqual(1, report_ready_func.call_count) self.assertEqual(1, detect_nics.call_count) - @mock.patch('cloudinit.sources.DataSourceAzure.util.write_file') - @mock.patch('cloudinit.sources.helpers.netlink.' - 'wait_for_media_disconnect_connect') + @mock.patch("cloudinit.sources.DataSourceAzure.util.write_file") @mock.patch( - 'cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready') - @mock.patch('cloudinit.sources.DataSourceAzure.readurl') + "cloudinit.sources.helpers.netlink.wait_for_media_disconnect_connect" + ) + @mock.patch( + "cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready" + ) + @mock.patch("cloudinit.sources.DataSourceAzure.readurl") def test_crawl_metadata_on_reprovision_reports_ready_using_lease( - self, m_readurl, m_report_ready, - m_media_switch, m_write + self, m_readurl, m_report_ready, m_media_switch, m_write ): """If reprovisioning, report ready using the obtained lease""" ovfenv = construct_valid_ovf_env( platform_settings={"PreprovisionedVm": "True"} ) - data = { - 'ovfcontent': ovfenv, - 'sys_cfg': {} - } + data = {"ovfcontent": ovfenv, "sys_cfg": {}} dsrc = self._get_ds(data) - with mock.patch.object(dsrc.distro.networking, 'is_up') \ - as m_dsrc_distro_networking_is_up: + with mock.patch.object( + dsrc.distro.networking, "is_up" + ) as m_dsrc_distro_networking_is_up: # For this mock, net should not be up, # so that cached ephemeral won't be used. @@ -1116,16 +1280,21 @@ scbus-1 on xpt0 bus 0 m_dsrc_distro_networking_is_up.return_value = False lease = { - 'interface': 'eth9', 'fixed-address': '192.168.2.9', - 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0', - 'unknown-245': '624c3620'} - self.m_ephemeral_dhcpv4_with_reporting.return_value \ - .__enter__.return_value = lease + "interface": "eth9", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + "unknown-245": "624c3620", + } + self.m_ephemeral_dhcpv4_with_reporting.return_value.__enter__.return_value = ( # noqa: E501 + lease + ) m_media_switch.return_value = None reprovision_ovfenv = construct_valid_ovf_env() m_readurl.return_value = url_helper.StringResponse( - reprovision_ovfenv.encode('utf-8')) + reprovision_ovfenv.encode("utf-8") + ) dsrc.crawl_metadata() self.assertEqual(2, m_report_ready.call_count) @@ -1133,91 +1302,118 @@ scbus-1 on xpt0 bus 0 def test_waagent_d_has_0700_perms(self): # we expect /var/lib/waagent to be created 0700 - dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) ret = dsrc.get_data() self.assertTrue(ret) self.assertTrue(os.path.isdir(self.waagent_d)) self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0o700) - @mock.patch('cloudinit.sources.DataSourceAzure.device_driver', - return_value=None) + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) def test_network_config_set_from_imds(self, m_driver): """Datasource.network_config returns IMDS network data.""" - sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}} + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {} - data = {'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': sys_cfg} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } expected_network_config = { - 'ethernets': { - 'eth0': {'set-name': 'eth0', - 'match': {'macaddress': '00:0d:3a:04:75:98'}, - 'dhcp6': False, - 'dhcp4': True, - 'dhcp4-overrides': {'route-metric': 100}}}, - 'version': 2} + "ethernets": { + "eth0": { + "set-name": "eth0", + "match": {"macaddress": "00:0d:3a:04:75:98"}, + "dhcp6": False, + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 100}, + } + }, + "version": 2, + } dsrc = self._get_ds(data) dsrc.get_data() self.assertEqual(expected_network_config, dsrc.network_config) - @mock.patch('cloudinit.sources.DataSourceAzure.device_driver', - return_value=None) + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) def test_network_config_set_from_imds_route_metric_for_secondary_nic( - self, m_driver): + self, m_driver + ): """Datasource.network_config adds route-metric to secondary nics.""" - sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}} + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {} - data = {'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': sys_cfg} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } expected_network_config = { - 'ethernets': { - 'eth0': {'set-name': 'eth0', - 'match': {'macaddress': '00:0d:3a:04:75:98'}, - 'dhcp6': False, - 'dhcp4': True, - 'dhcp4-overrides': {'route-metric': 100}}, - 'eth1': {'set-name': 'eth1', - 'match': {'macaddress': '22:0d:3a:04:75:98'}, - 'dhcp6': False, - 'dhcp4': True, - 'dhcp4-overrides': {'route-metric': 200}}, - 'eth2': {'set-name': 'eth2', - 'match': {'macaddress': '33:0d:3a:04:75:98'}, - 'dhcp6': False, - 'dhcp4': True, - 'dhcp4-overrides': {'route-metric': 300}}}, - 'version': 2} + "ethernets": { + "eth0": { + "set-name": "eth0", + "match": {"macaddress": "00:0d:3a:04:75:98"}, + "dhcp6": False, + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 100}, + }, + "eth1": { + "set-name": "eth1", + "match": {"macaddress": "22:0d:3a:04:75:98"}, + "dhcp6": False, + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 200}, + }, + "eth2": { + "set-name": "eth2", + "match": {"macaddress": "33:0d:3a:04:75:98"}, + "dhcp6": False, + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 300}, + }, + }, + "version": 2, + } imds_data = copy.deepcopy(NETWORK_METADATA) - imds_data['network']['interface'].append(SECONDARY_INTERFACE) + imds_data["network"]["interface"].append(SECONDARY_INTERFACE) third_intf = copy.deepcopy(SECONDARY_INTERFACE) - third_intf['macAddress'] = third_intf['macAddress'].replace('22', '33') - third_intf['ipv4']['subnet'][0]['address'] = '10.0.2.0' - third_intf['ipv4']['ipAddress'][0]['privateIpAddress'] = '10.0.2.6' - imds_data['network']['interface'].append(third_intf) + third_intf["macAddress"] = third_intf["macAddress"].replace("22", "33") + third_intf["ipv4"]["subnet"][0]["address"] = "10.0.2.0" + third_intf["ipv4"]["ipAddress"][0]["privateIpAddress"] = "10.0.2.6" + imds_data["network"]["interface"].append(third_intf) self.m_get_metadata_from_imds.return_value = imds_data dsrc = self._get_ds(data) dsrc.get_data() self.assertEqual(expected_network_config, dsrc.network_config) - @mock.patch('cloudinit.sources.DataSourceAzure.device_driver', - return_value=None) + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) def test_network_config_set_from_imds_for_secondary_nic_no_ip( - self, m_driver): + self, m_driver + ): """If an IP address is empty then there should no config for it.""" - sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}} + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {} - data = {'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': sys_cfg} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } expected_network_config = { - 'ethernets': { - 'eth0': {'set-name': 'eth0', - 'match': {'macaddress': '00:0d:3a:04:75:98'}, - 'dhcp6': False, - 'dhcp4': True, - 'dhcp4-overrides': {'route-metric': 100}}}, - 'version': 2} + "ethernets": { + "eth0": { + "set-name": "eth0", + "match": {"macaddress": "00:0d:3a:04:75:98"}, + "dhcp6": False, + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 100}, + } + }, + "version": 2, + } imds_data = copy.deepcopy(NETWORK_METADATA) - imds_data['network']['interface'].append(SECONDARY_INTERFACE_NO_IP) + imds_data["network"]["interface"].append(SECONDARY_INTERFACE_NO_IP) self.m_get_metadata_from_imds.return_value = imds_data dsrc = self._get_ds(data) dsrc.get_data() @@ -1225,91 +1421,110 @@ scbus-1 on xpt0 bus 0 def test_availability_zone_set_from_imds(self): """Datasource.availability returns IMDS platformFaultDomain.""" - sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}} + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {} - data = {'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': sys_cfg} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } dsrc = self._get_ds(data) dsrc.get_data() - self.assertEqual('0', dsrc.availability_zone) + self.assertEqual("0", dsrc.availability_zone) def test_region_set_from_imds(self): """Datasource.region returns IMDS region location.""" - sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}} + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {} - data = {'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': sys_cfg} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } dsrc = self._get_ds(data) dsrc.get_data() - self.assertEqual('eastus2', dsrc.region) + self.assertEqual("eastus2", dsrc.region) def test_sys_cfg_set_never_destroy_ntfs(self): - sys_cfg = {'datasource': {'Azure': { - 'never_destroy_ntfs': 'user-supplied-value'}}} - data = {'ovfcontent': construct_valid_ovf_env(data={}), - 'sys_cfg': sys_cfg} + sys_cfg = { + "datasource": { + "Azure": {"never_destroy_ntfs": "user-supplied-value"} + } + } + data = { + "ovfcontent": construct_valid_ovf_env(data={}), + "sys_cfg": sys_cfg, + } dsrc = self._get_ds(data) ret = self._get_and_setup(dsrc) self.assertTrue(ret) - self.assertEqual(dsrc.ds_cfg.get(dsaz.DS_CFG_KEY_PRESERVE_NTFS), - 'user-supplied-value') + self.assertEqual( + dsrc.ds_cfg.get(dsaz.DS_CFG_KEY_PRESERVE_NTFS), + "user-supplied-value", + ) def test_username_used(self): - odata = {'HostName': "myhost", 'UserName': "myuser"} - data = {'ovfcontent': construct_valid_ovf_env(data=odata)} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = {"ovfcontent": construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(dsrc.cfg['system_info']['default_user']['name'], - "myuser") + self.assertEqual( + dsrc.cfg["system_info"]["default_user"]["name"], "myuser" + ) def test_password_given(self): - odata = {'HostName': "myhost", 'UserName': "myuser", - 'UserPassword': "mypass"} - data = {'ovfcontent': construct_valid_ovf_env(data=odata)} + odata = { + "HostName": "myhost", + "UserName": "myuser", + "UserPassword": "mypass", + } + data = {"ovfcontent": construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) - self.assertIn('default_user', dsrc.cfg['system_info']) - defuser = dsrc.cfg['system_info']['default_user'] + self.assertIn("default_user", dsrc.cfg["system_info"]) + defuser = dsrc.cfg["system_info"]["default_user"] # default user should be updated username and should not be locked. - self.assertEqual(defuser['name'], odata['UserName']) - self.assertFalse(defuser['lock_passwd']) + self.assertEqual(defuser["name"], odata["UserName"]) + self.assertFalse(defuser["lock_passwd"]) # passwd is crypt formated string $id$salt$encrypted # encrypting plaintext with salt value of everything up to final '$' # should equal that after the '$' - pos = defuser['passwd'].rfind("$") + 1 - self.assertEqual(defuser['passwd'], - crypt.crypt(odata['UserPassword'], - defuser['passwd'][0:pos])) + pos = defuser["passwd"].rfind("$") + 1 + self.assertEqual( + defuser["passwd"], + crypt.crypt(odata["UserPassword"], defuser["passwd"][0:pos]), + ) # the same hashed value should also be present in cfg['password'] - self.assertEqual(defuser['passwd'], dsrc.cfg['password']) + self.assertEqual(defuser["passwd"], dsrc.cfg["password"]) def test_user_not_locked_if_password_redacted(self): - odata = {'HostName': "myhost", 'UserName': "myuser", - 'UserPassword': dsaz.DEF_PASSWD_REDACTION} - data = {'ovfcontent': construct_valid_ovf_env(data=odata)} + odata = { + "HostName": "myhost", + "UserName": "myuser", + "UserPassword": dsaz.DEF_PASSWD_REDACTION, + } + data = {"ovfcontent": construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) - self.assertIn('default_user', dsrc.cfg['system_info']) - defuser = dsrc.cfg['system_info']['default_user'] + self.assertIn("default_user", dsrc.cfg["system_info"]) + defuser = dsrc.cfg["system_info"]["default_user"] # default user should be updated username and should not be locked. - self.assertEqual(defuser['name'], odata['UserName']) - self.assertIn('lock_passwd', defuser) - self.assertFalse(defuser['lock_passwd']) + self.assertEqual(defuser["name"], odata["UserName"]) + self.assertIn("lock_passwd", defuser) + self.assertFalse(defuser["lock_passwd"]) def test_userdata_plain(self): mydata = "FOOBAR" - odata = {'UserData': {'text': mydata, 'encoding': 'plain'}} - data = {'ovfcontent': construct_valid_ovf_env(data=odata)} + odata = {"UserData": {"text": mydata, "encoding": "plain"}} + data = {"ovfcontent": construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) ret = dsrc.get_data() @@ -1318,72 +1533,86 @@ scbus-1 on xpt0 bus 0 def test_userdata_found(self): mydata = "FOOBAR" - odata = {'UserData': {'text': b64e(mydata), 'encoding': 'base64'}} - data = {'ovfcontent': construct_valid_ovf_env(data=odata)} + odata = {"UserData": {"text": b64e(mydata), "encoding": "base64"}} + data = {"ovfcontent": construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(dsrc.userdata_raw, mydata.encode('utf-8')) + self.assertEqual(dsrc.userdata_raw, mydata.encode("utf-8")) def test_default_ephemeral_configs_ephemeral_exists(self): # make sure the ephemeral configs are correct if disk present odata = {} - data = {'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': {}} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": {}, + } orig_exists = dsaz.os.path.exists def changed_exists(path): - return True if path == dsaz.RESOURCE_DISK_PATH else orig_exists( - path) + return ( + True if path == dsaz.RESOURCE_DISK_PATH else orig_exists(path) + ) - with mock.patch(MOCKPATH + 'os.path.exists', new=changed_exists): + with mock.patch(MOCKPATH + "os.path.exists", new=changed_exists): dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) cfg = dsrc.get_config_obj() - self.assertEqual(dsrc.device_name_to_device("ephemeral0"), - dsaz.RESOURCE_DISK_PATH) - assert 'disk_setup' in cfg - assert 'fs_setup' in cfg - self.assertIsInstance(cfg['disk_setup'], dict) - self.assertIsInstance(cfg['fs_setup'], list) + self.assertEqual( + dsrc.device_name_to_device("ephemeral0"), + dsaz.RESOURCE_DISK_PATH, + ) + assert "disk_setup" in cfg + assert "fs_setup" in cfg + self.assertIsInstance(cfg["disk_setup"], dict) + self.assertIsInstance(cfg["fs_setup"], list) def test_default_ephemeral_configs_ephemeral_does_not_exist(self): # make sure the ephemeral configs are correct if disk not present odata = {} - data = {'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': {}} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": {}, + } orig_exists = dsaz.os.path.exists def changed_exists(path): - return False if path == dsaz.RESOURCE_DISK_PATH else orig_exists( - path) + return ( + False if path == dsaz.RESOURCE_DISK_PATH else orig_exists(path) + ) - with mock.patch(MOCKPATH + 'os.path.exists', new=changed_exists): + with mock.patch(MOCKPATH + "os.path.exists", new=changed_exists): dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) cfg = dsrc.get_config_obj() - assert 'disk_setup' not in cfg - assert 'fs_setup' not in cfg + assert "disk_setup" not in cfg + assert "fs_setup" not in cfg def test_provide_disk_aliases(self): # Make sure that user can affect disk aliases - dscfg = {'disk_aliases': {'ephemeral0': '/dev/sdc'}} - odata = {'HostName': "myhost", 'UserName': "myuser", - 'dscfg': {'text': b64e(yaml.dump(dscfg)), - 'encoding': 'base64'}} - usercfg = {'disk_setup': {'/dev/sdc': {'something': '...'}, - 'ephemeral0': False}} - userdata = '#cloud-config' + yaml.dump(usercfg) + "\n" + dscfg = {"disk_aliases": {"ephemeral0": "/dev/sdc"}} + odata = { + "HostName": "myhost", + "UserName": "myuser", + "dscfg": {"text": b64e(yaml.dump(dscfg)), "encoding": "base64"}, + } + usercfg = { + "disk_setup": { + "/dev/sdc": {"something": "..."}, + "ephemeral0": False, + } + } + userdata = "#cloud-config" + yaml.dump(usercfg) + "\n" ovfcontent = construct_valid_ovf_env(data=odata, userdata=userdata) - data = {'ovfcontent': ovfcontent, 'sys_cfg': {}} + data = {"ovfcontent": ovfcontent, "sys_cfg": {}} dsrc = self._get_ds(data) ret = dsrc.get_data() @@ -1394,92 +1623,95 @@ scbus-1 on xpt0 bus 0 def test_userdata_arrives(self): userdata = "This is my user-data" xml = construct_valid_ovf_env(data={}, userdata=userdata) - data = {'ovfcontent': xml} + data = {"ovfcontent": xml} dsrc = self._get_ds(data) dsrc.get_data() - self.assertEqual(userdata.encode('us-ascii'), dsrc.userdata_raw) + self.assertEqual(userdata.encode("us-ascii"), dsrc.userdata_raw) def test_password_redacted_in_ovf(self): - odata = {'HostName': "myhost", 'UserName': "myuser", - 'UserPassword': "mypass"} - data = {'ovfcontent': construct_valid_ovf_env(data=odata)} + odata = { + "HostName": "myhost", + "UserName": "myuser", + "UserPassword": "mypass", + } + data = {"ovfcontent": construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) - ovf_env_path = os.path.join(self.waagent_d, 'ovf-env.xml') + ovf_env_path = os.path.join(self.waagent_d, "ovf-env.xml") # The XML should not be same since the user password is redacted on_disk_ovf = load_file(ovf_env_path) - self.xml_notequals(data['ovfcontent'], on_disk_ovf) + self.xml_notequals(data["ovfcontent"], on_disk_ovf) # Make sure that the redacted password on disk is not used by CI - self.assertNotEqual(dsrc.cfg.get('password'), - dsaz.DEF_PASSWD_REDACTION) + self.assertNotEqual( + dsrc.cfg.get("password"), dsaz.DEF_PASSWD_REDACTION + ) # Make sure that the password was really encrypted et = ET.fromstring(on_disk_ovf) for elem in et.iter(): - if 'UserPassword' in elem.tag: + if "UserPassword" in elem.tag: self.assertEqual(dsaz.DEF_PASSWD_REDACTION, elem.text) def test_ovf_env_arrives_in_waagent_dir(self): xml = construct_valid_ovf_env(data={}, userdata="FOODATA") - dsrc = self._get_ds({'ovfcontent': xml}) + dsrc = self._get_ds({"ovfcontent": xml}) dsrc.get_data() # 'data_dir' is '/var/lib/waagent' (walinux-agent's state dir) # we expect that the ovf-env.xml file is copied there. - ovf_env_path = os.path.join(self.waagent_d, 'ovf-env.xml') + ovf_env_path = os.path.join(self.waagent_d, "ovf-env.xml") self.assertTrue(os.path.exists(ovf_env_path)) self.xml_equals(xml, load_file(ovf_env_path)) def test_ovf_can_include_unicode(self): xml = construct_valid_ovf_env(data={}) - xml = '\ufeff{0}'.format(xml) - dsrc = self._get_ds({'ovfcontent': xml}) + xml = "\ufeff{0}".format(xml) + dsrc = self._get_ds({"ovfcontent": xml}) dsrc.get_data() - def test_dsaz_report_ready_returns_true_when_report_succeeds( - self): - dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) + def test_dsaz_report_ready_returns_true_when_report_succeeds(self): + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) self.assertTrue(dsrc._report_ready(lease=mock.MagicMock())) - def test_dsaz_report_ready_returns_false_and_does_not_propagate_exc( - self): - dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) + def test_dsaz_report_ready_returns_false_and_does_not_propagate_exc(self): + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) self.m_get_metadata_from_fabric.side_effect = Exception self.assertFalse(dsrc._report_ready(lease=mock.MagicMock())) def test_dsaz_report_failure_returns_true_when_report_succeeds(self): - dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) - with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata: + with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata: # mock crawl metadata failure to cause report failure m_crawl_metadata.side_effect = Exception self.assertTrue(dsrc._report_failure()) - self.assertEqual( - 1, - self.m_report_failure_to_fabric.call_count) + self.assertEqual(1, self.m_report_failure_to_fabric.call_count) def test_dsaz_report_failure_returns_false_and_does_not_propagate_exc( - self): - dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) - - with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata, \ - mock.patch.object(dsrc, '_ephemeral_dhcp_ctx') \ - as m_ephemeral_dhcp_ctx, \ - mock.patch.object(dsrc.distro.networking, 'is_up') \ - as m_dsrc_distro_networking_is_up: + self, + ): + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) + + with mock.patch.object( + dsrc, "crawl_metadata" + ) as m_crawl_metadata, mock.patch.object( + dsrc, "_ephemeral_dhcp_ctx" + ) as m_ephemeral_dhcp_ctx, mock.patch.object( + dsrc.distro.networking, "is_up" + ) as m_dsrc_distro_networking_is_up: # mock crawl metadata failure to cause report failure m_crawl_metadata.side_effect = Exception # setup mocks to allow using cached ephemeral dhcp lease m_dsrc_distro_networking_is_up.return_value = True - test_lease_dhcp_option_245 = 'test_lease_dhcp_option_245' - test_lease = {'unknown-245': test_lease_dhcp_option_245} + test_lease_dhcp_option_245 = "test_lease_dhcp_option_245" + test_lease = {"unknown-245": test_lease_dhcp_option_245} m_ephemeral_dhcp_ctx.lease = test_lease # We expect 3 calls to report_failure_to_fabric, @@ -1490,91 +1722,97 @@ scbus-1 on xpt0 bus 0 # 3. Using fallback lease to report failure to Azure self.m_report_failure_to_fabric.side_effect = Exception self.assertFalse(dsrc._report_failure()) - self.assertEqual( - 3, - self.m_report_failure_to_fabric.call_count) + self.assertEqual(3, self.m_report_failure_to_fabric.call_count) def test_dsaz_report_failure_description_msg(self): - dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) - with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata: + with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata: # mock crawl metadata failure to cause report failure m_crawl_metadata.side_effect = Exception - test_msg = 'Test report failure description message' + test_msg = "Test report failure description message" self.assertTrue(dsrc._report_failure(description=test_msg)) self.m_report_failure_to_fabric.assert_called_once_with( - dhcp_opts=mock.ANY, description=test_msg) + dhcp_opts=mock.ANY, description=test_msg + ) def test_dsaz_report_failure_no_description_msg(self): - dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) - with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata: + with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata: m_crawl_metadata.side_effect = Exception self.assertTrue(dsrc._report_failure()) # no description msg self.m_report_failure_to_fabric.assert_called_once_with( - dhcp_opts=mock.ANY, description=None) + dhcp_opts=mock.ANY, description=None + ) def test_dsaz_report_failure_uses_cached_ephemeral_dhcp_ctx_lease(self): - dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) - - with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata, \ - mock.patch.object(dsrc, '_ephemeral_dhcp_ctx') \ - as m_ephemeral_dhcp_ctx, \ - mock.patch.object(dsrc.distro.networking, 'is_up') \ - as m_dsrc_distro_networking_is_up: + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) + + with mock.patch.object( + dsrc, "crawl_metadata" + ) as m_crawl_metadata, mock.patch.object( + dsrc, "_ephemeral_dhcp_ctx" + ) as m_ephemeral_dhcp_ctx, mock.patch.object( + dsrc.distro.networking, "is_up" + ) as m_dsrc_distro_networking_is_up: # mock crawl metadata failure to cause report failure m_crawl_metadata.side_effect = Exception # setup mocks to allow using cached ephemeral dhcp lease m_dsrc_distro_networking_is_up.return_value = True - test_lease_dhcp_option_245 = 'test_lease_dhcp_option_245' - test_lease = {'unknown-245': test_lease_dhcp_option_245} + test_lease_dhcp_option_245 = "test_lease_dhcp_option_245" + test_lease = {"unknown-245": test_lease_dhcp_option_245} m_ephemeral_dhcp_ctx.lease = test_lease self.assertTrue(dsrc._report_failure()) # ensure called with cached ephemeral dhcp lease option 245 self.m_report_failure_to_fabric.assert_called_once_with( - description=mock.ANY, dhcp_opts=test_lease_dhcp_option_245) + description=mock.ANY, dhcp_opts=test_lease_dhcp_option_245 + ) # ensure cached ephemeral is cleaned - self.assertEqual( - 1, - m_ephemeral_dhcp_ctx.clean_network.call_count) + self.assertEqual(1, m_ephemeral_dhcp_ctx.clean_network.call_count) def test_dsaz_report_failure_no_net_uses_new_ephemeral_dhcp_lease(self): - dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) - with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata, \ - mock.patch.object(dsrc.distro.networking, 'is_up') \ - as m_dsrc_distro_networking_is_up: + with mock.patch.object( + dsrc, "crawl_metadata" + ) as m_crawl_metadata, mock.patch.object( + dsrc.distro.networking, "is_up" + ) as m_dsrc_distro_networking_is_up: # mock crawl metadata failure to cause report failure m_crawl_metadata.side_effect = Exception # net is not up and cannot use cached ephemeral dhcp m_dsrc_distro_networking_is_up.return_value = False # setup ephemeral dhcp lease discovery mock - test_lease_dhcp_option_245 = 'test_lease_dhcp_option_245' - test_lease = {'unknown-245': test_lease_dhcp_option_245} - self.m_ephemeral_dhcpv4_with_reporting.return_value \ - .__enter__.return_value = test_lease + test_lease_dhcp_option_245 = "test_lease_dhcp_option_245" + test_lease = {"unknown-245": test_lease_dhcp_option_245} + self.m_ephemeral_dhcpv4_with_reporting.return_value.__enter__.return_value = ( # noqa: E501 + test_lease + ) self.assertTrue(dsrc._report_failure()) # ensure called with the newly discovered # ephemeral dhcp lease option 245 self.m_report_failure_to_fabric.assert_called_once_with( - description=mock.ANY, dhcp_opts=test_lease_dhcp_option_245) + description=mock.ANY, dhcp_opts=test_lease_dhcp_option_245 + ) - def test_dsaz_report_failure_no_net_and_no_dhcp_uses_fallback_lease( - self): - dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) + def test_dsaz_report_failure_no_net_and_no_dhcp_uses_fallback_lease(self): + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) - with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata, \ - mock.patch.object(dsrc.distro.networking, 'is_up') \ - as m_dsrc_distro_networking_is_up: + with mock.patch.object( + dsrc, "crawl_metadata" + ) as m_crawl_metadata, mock.patch.object( + dsrc.distro.networking, "is_up" + ) as m_dsrc_distro_networking_is_up: # mock crawl metadata failure to cause report failure m_crawl_metadata.side_effect = Exception @@ -1582,29 +1820,31 @@ scbus-1 on xpt0 bus 0 m_dsrc_distro_networking_is_up.return_value = False # ephemeral dhcp discovery failure, # so cannot use a new ephemeral dhcp - self.m_ephemeral_dhcpv4_with_reporting.return_value \ - .__enter__.side_effect = Exception + self.m_ephemeral_dhcpv4_with_reporting.return_value.__enter__.side_effect = ( # noqa: E501 + Exception + ) self.assertTrue(dsrc._report_failure()) # ensure called with fallback lease self.m_report_failure_to_fabric.assert_called_once_with( description=mock.ANY, - fallback_lease_file=dsrc.dhclient_lease_file) + fallback_lease_file=dsrc.dhclient_lease_file, + ) def test_exception_fetching_fabric_data_doesnt_propagate(self): """Errors communicating with fabric should warn, but return True.""" - dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) self.m_get_metadata_from_fabric.side_effect = Exception ret = self._get_and_setup(dsrc) self.assertTrue(ret) def test_fabric_data_included_in_metadata(self): - dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) - self.m_get_metadata_from_fabric.return_value = {'test': 'value'} + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) + self.m_get_metadata_from_fabric.return_value = {"test": "value"} ret = self._get_and_setup(dsrc) self.assertTrue(ret) - self.assertEqual('value', dsrc.metadata['test']) + self.assertEqual("value", dsrc.metadata["test"]) def test_instance_id_case_insensitive(self): """Return the previous iid when current is a case-insensitive match.""" @@ -1612,152 +1852,180 @@ scbus-1 on xpt0 bus 0 upper_iid = EXAMPLE_UUID.upper() # lowercase current UUID ds = self._get_ds( - {'ovfcontent': construct_valid_ovf_env()}, instance_id=lower_iid + {"ovfcontent": construct_valid_ovf_env()}, instance_id=lower_iid ) # UPPERCASE previous write_file( - os.path.join(self.paths.cloud_dir, 'data', 'instance-id'), - upper_iid) + os.path.join(self.paths.cloud_dir, "data", "instance-id"), + upper_iid, + ) ds.get_data() - self.assertEqual(upper_iid, ds.metadata['instance-id']) + self.assertEqual(upper_iid, ds.metadata["instance-id"]) # UPPERCASE current UUID ds = self._get_ds( - {'ovfcontent': construct_valid_ovf_env()}, instance_id=upper_iid + {"ovfcontent": construct_valid_ovf_env()}, instance_id=upper_iid ) # lowercase previous write_file( - os.path.join(self.paths.cloud_dir, 'data', 'instance-id'), - lower_iid) + os.path.join(self.paths.cloud_dir, "data", "instance-id"), + lower_iid, + ) ds.get_data() - self.assertEqual(lower_iid, ds.metadata['instance-id']) + self.assertEqual(lower_iid, ds.metadata["instance-id"]) def test_instance_id_endianness(self): """Return the previous iid when dmi uuid is the byteswapped iid.""" - ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) + ds = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) # byte-swapped previous write_file( - os.path.join(self.paths.cloud_dir, 'data', 'instance-id'), - '544CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8') + os.path.join(self.paths.cloud_dir, "data", "instance-id"), + "544CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8", + ) ds.get_data() self.assertEqual( - '544CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8', ds.metadata['instance-id']) + "544CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8", ds.metadata["instance-id"] + ) # not byte-swapped previous write_file( - os.path.join(self.paths.cloud_dir, 'data', 'instance-id'), - '644CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8') + os.path.join(self.paths.cloud_dir, "data", "instance-id"), + "644CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8", + ) ds.get_data() - self.assertEqual(self.instance_id, ds.metadata['instance-id']) + self.assertEqual(self.instance_id, ds.metadata["instance-id"]) def test_instance_id_from_dmidecode_used(self): - ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) + ds = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) ds.get_data() - self.assertEqual(self.instance_id, ds.metadata['instance-id']) + self.assertEqual(self.instance_id, ds.metadata["instance-id"]) def test_instance_id_from_dmidecode_used_for_builtin(self): - ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()}) + ds = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) ds.get_data() - self.assertEqual(self.instance_id, ds.metadata['instance-id']) + self.assertEqual(self.instance_id, ds.metadata["instance-id"]) - @mock.patch(MOCKPATH + 'util.is_FreeBSD') - @mock.patch(MOCKPATH + '_check_freebsd_cdrom') - def test_list_possible_azure_ds(self, m_check_fbsd_cdrom, - m_is_FreeBSD): + @mock.patch(MOCKPATH + "util.is_FreeBSD") + @mock.patch(MOCKPATH + "_check_freebsd_cdrom") + def test_list_possible_azure_ds(self, m_check_fbsd_cdrom, m_is_FreeBSD): """On FreeBSD, possible devs should show /dev/cd0.""" m_is_FreeBSD.return_value = True m_check_fbsd_cdrom.return_value = True possible_ds = [] - for src in dsaz.list_possible_azure_ds( - "seed_dir", "cache_dir"): + for src in dsaz.list_possible_azure_ds("seed_dir", "cache_dir"): possible_ds.append(src) - self.assertEqual(possible_ds, ["seed_dir", - dsaz.DEFAULT_PROVISIONING_ISO_DEV, - "/dev/cd0", - "cache_dir"]) self.assertEqual( - [mock.call("/dev/cd0")], m_check_fbsd_cdrom.call_args_list) + possible_ds, + [ + "seed_dir", + dsaz.DEFAULT_PROVISIONING_ISO_DEV, + "/dev/cd0", + "cache_dir", + ], + ) + self.assertEqual( + [mock.call("/dev/cd0")], m_check_fbsd_cdrom.call_args_list + ) - @mock.patch('cloudinit.sources.DataSourceAzure.device_driver', - return_value=None) - @mock.patch('cloudinit.net.generate_fallback_config') + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) + @mock.patch("cloudinit.net.generate_fallback_config") def test_imds_network_config(self, mock_fallback, m_driver): """Network config is generated from IMDS network data when present.""" - sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}} - odata = {'HostName': "myhost", 'UserName': "myuser"} - data = {'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': sys_cfg} + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) expected_cfg = { - 'ethernets': { - 'eth0': {'dhcp4': True, - 'dhcp4-overrides': {'route-metric': 100}, - 'dhcp6': False, - 'match': {'macaddress': '00:0d:3a:04:75:98'}, - 'set-name': 'eth0'}}, - 'version': 2} + "ethernets": { + "eth0": { + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 100}, + "dhcp6": False, + "match": {"macaddress": "00:0d:3a:04:75:98"}, + "set-name": "eth0", + } + }, + "version": 2, + } self.assertEqual(expected_cfg, dsrc.network_config) mock_fallback.assert_not_called() - @mock.patch('cloudinit.net.get_interface_mac') - @mock.patch('cloudinit.net.get_devicelist') - @mock.patch('cloudinit.net.device_driver') - @mock.patch('cloudinit.net.generate_fallback_config') + @mock.patch("cloudinit.net.get_interface_mac") + @mock.patch("cloudinit.net.get_devicelist") + @mock.patch("cloudinit.net.device_driver") + @mock.patch("cloudinit.net.generate_fallback_config") def test_imds_network_ignored_when_apply_network_config_false( - self, mock_fallback, mock_dd, mock_devlist, mock_get_mac): + self, mock_fallback, mock_dd, mock_devlist, mock_get_mac + ): """When apply_network_config is False, use fallback instead of IMDS.""" - sys_cfg = {'datasource': {'Azure': {'apply_network_config': False}}} - odata = {'HostName': "myhost", 'UserName': "myuser"} - data = {'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': sys_cfg} + sys_cfg = {"datasource": {"Azure": {"apply_network_config": False}}} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } fallback_config = { - 'version': 1, - 'config': [{ - 'type': 'physical', 'name': 'eth0', - 'mac_address': '00:11:22:33:44:55', - 'params': {'driver': 'hv_netsvc'}, - 'subnets': [{'type': 'dhcp'}], - }] + "version": 1, + "config": [ + { + "type": "physical", + "name": "eth0", + "mac_address": "00:11:22:33:44:55", + "params": {"driver": "hv_netsvc"}, + "subnets": [{"type": "dhcp"}], + } + ], } mock_fallback.return_value = fallback_config - mock_devlist.return_value = ['eth0'] - mock_dd.return_value = ['hv_netsvc'] - mock_get_mac.return_value = '00:11:22:33:44:55' + mock_devlist.return_value = ["eth0"] + mock_dd.return_value = ["hv_netsvc"] + mock_get_mac.return_value = "00:11:22:33:44:55" dsrc = self._get_ds(data) self.assertTrue(dsrc.get_data()) self.assertEqual(dsrc.network_config, fallback_config) - @mock.patch('cloudinit.net.get_interface_mac') - @mock.patch('cloudinit.net.get_devicelist') - @mock.patch('cloudinit.net.device_driver') - @mock.patch('cloudinit.net.generate_fallback_config', autospec=True) - def test_fallback_network_config(self, mock_fallback, mock_dd, - mock_devlist, mock_get_mac): + @mock.patch("cloudinit.net.get_interface_mac") + @mock.patch("cloudinit.net.get_devicelist") + @mock.patch("cloudinit.net.device_driver") + @mock.patch("cloudinit.net.generate_fallback_config", autospec=True) + def test_fallback_network_config( + self, mock_fallback, mock_dd, mock_devlist, mock_get_mac + ): """On absent IMDS network data, generate network fallback config.""" - odata = {'HostName': "myhost", 'UserName': "myuser"} - data = {'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': {}} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": {}, + } fallback_config = { - 'version': 1, - 'config': [{ - 'type': 'physical', 'name': 'eth0', - 'mac_address': '00:11:22:33:44:55', - 'params': {'driver': 'hv_netsvc'}, - 'subnets': [{'type': 'dhcp'}], - }] + "version": 1, + "config": [ + { + "type": "physical", + "name": "eth0", + "mac_address": "00:11:22:33:44:55", + "params": {"driver": "hv_netsvc"}, + "subnets": [{"type": "dhcp"}], + } + ], } mock_fallback.return_value = fallback_config - mock_devlist.return_value = ['eth0'] - mock_dd.return_value = ['hv_netsvc'] - mock_get_mac.return_value = '00:11:22:33:44:55' + mock_devlist.return_value = ["eth0"] + mock_dd.return_value = ["hv_netsvc"] + mock_get_mac.return_value = "00:11:22:33:44:55" dsrc = self._get_ds(data) # Represent empty response from network imds @@ -1768,37 +2036,41 @@ scbus-1 on xpt0 bus 0 netconfig = dsrc.network_config self.assertEqual(netconfig, fallback_config) mock_fallback.assert_called_with( - blacklist_drivers=['mlx4_core', 'mlx5_core'], - config_driver=True) + blacklist_drivers=["mlx4_core", "mlx5_core"], config_driver=True + ) - @mock.patch(MOCKPATH + 'net.get_interfaces', autospec=True) - def test_blacklist_through_distro( - self, m_net_get_interfaces): + @mock.patch(MOCKPATH + "net.get_interfaces", autospec=True) + def test_blacklist_through_distro(self, m_net_get_interfaces): """Verify Azure DS updates blacklist drivers in the distro's - networking object.""" - odata = {'HostName': "myhost", 'UserName': "myuser"} - data = {'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': {}} + networking object.""" + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": {}, + } - distro_cls = distros.fetch('ubuntu') - distro = distro_cls('ubuntu', {}, self.paths) + distro_cls = distros.fetch("ubuntu") + distro = distro_cls("ubuntu", {}, self.paths) dsrc = self._get_ds(data, distro=distro) dsrc.get_data() - self.assertEqual(distro.networking.blacklist_drivers, - dsaz.BLACKLIST_DRIVERS) + self.assertEqual( + distro.networking.blacklist_drivers, dsaz.BLACKLIST_DRIVERS + ) distro.networking.get_interfaces_by_mac() m_net_get_interfaces.assert_called_with( - blacklist_drivers=dsaz.BLACKLIST_DRIVERS) + blacklist_drivers=dsaz.BLACKLIST_DRIVERS + ) @mock.patch( - 'cloudinit.sources.helpers.azure.OpenSSLManager.parse_certificates') + "cloudinit.sources.helpers.azure.OpenSSLManager.parse_certificates" + ) def test_get_public_ssh_keys_with_imds(self, m_parse_certificates): - sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}} - odata = {'HostName': "myhost", 'UserName': "myuser"} + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} data = { - 'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': sys_cfg + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, } dsrc = self._get_ds(data) dsrc.get_data() @@ -1808,32 +2080,32 @@ scbus-1 on xpt0 bus 0 self.assertEqual(m_parse_certificates.call_count, 0) def test_key_without_crlf_valid(self): - test_key = 'ssh-rsa somerandomkeystuff some comment' + test_key = "ssh-rsa somerandomkeystuff some comment" assert True is dsaz._key_is_openssh_formatted(test_key) def test_key_with_crlf_invalid(self): - test_key = 'ssh-rsa someran\r\ndomkeystuff some comment' + test_key = "ssh-rsa someran\r\ndomkeystuff some comment" assert False is dsaz._key_is_openssh_formatted(test_key) def test_key_endswith_crlf_valid(self): - test_key = 'ssh-rsa somerandomkeystuff some comment\r\n' + test_key = "ssh-rsa somerandomkeystuff some comment\r\n" assert True is dsaz._key_is_openssh_formatted(test_key) @mock.patch( - 'cloudinit.sources.helpers.azure.OpenSSLManager.parse_certificates') - @mock.patch(MOCKPATH + 'get_metadata_from_imds') + "cloudinit.sources.helpers.azure.OpenSSLManager.parse_certificates" + ) + @mock.patch(MOCKPATH + "get_metadata_from_imds") def test_get_public_ssh_keys_with_no_openssh_format( - self, - m_get_metadata_from_imds, - m_parse_certificates): + self, m_get_metadata_from_imds, m_parse_certificates + ): imds_data = copy.deepcopy(NETWORK_METADATA) - imds_data['compute']['publicKeys'][0]['keyData'] = 'no-openssh-format' + imds_data["compute"]["publicKeys"][0]["keyData"] = "no-openssh-format" m_get_metadata_from_imds.return_value = imds_data - sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}} - odata = {'HostName': "myhost", 'UserName': "myuser"} + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} data = { - 'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': sys_cfg + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, } dsrc = self._get_ds(data) dsrc.get_data() @@ -1842,38 +2114,37 @@ scbus-1 on xpt0 bus 0 self.assertEqual(ssh_keys, []) self.assertEqual(m_parse_certificates.call_count, 0) - @mock.patch(MOCKPATH + 'get_metadata_from_imds') - def test_get_public_ssh_keys_without_imds( - self, - m_get_metadata_from_imds): + @mock.patch(MOCKPATH + "get_metadata_from_imds") + def test_get_public_ssh_keys_without_imds(self, m_get_metadata_from_imds): m_get_metadata_from_imds.return_value = dict() - sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}} - odata = {'HostName': "myhost", 'UserName': "myuser"} + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} data = { - 'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': sys_cfg + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, } dsrc = self._get_ds(data) - dsaz.get_metadata_from_fabric.return_value = {'public-keys': ['key2']} + dsaz.get_metadata_from_fabric.return_value = {"public-keys": ["key2"]} dsrc.get_data() dsrc.setup(True) ssh_keys = dsrc.get_public_ssh_keys() - self.assertEqual(ssh_keys, ['key2']) + self.assertEqual(ssh_keys, ["key2"]) - @mock.patch(MOCKPATH + 'get_metadata_from_imds') + @mock.patch(MOCKPATH + "get_metadata_from_imds") def test_imds_api_version_wanted_nonexistent( - self, - m_get_metadata_from_imds): + self, m_get_metadata_from_imds + ): def get_metadata_from_imds_side_eff(*args, **kwargs): - if kwargs['api_version'] == dsaz.IMDS_VER_WANT: + if kwargs["api_version"] == dsaz.IMDS_VER_WANT: raise url_helper.UrlError("No IMDS version", code=400) return NETWORK_METADATA + m_get_metadata_from_imds.side_effect = get_metadata_from_imds_side_eff - sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}} - odata = {'HostName': "myhost", 'UserName': "myuser"} + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} data = { - 'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': sys_cfg + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, } dsrc = self._get_ds(data) dsrc.get_data() @@ -1881,86 +2152,86 @@ scbus-1 on xpt0 bus 0 self.assertTrue(dsrc.failed_desired_api_version) @mock.patch( - MOCKPATH + 'get_metadata_from_imds', return_value=NETWORK_METADATA) + MOCKPATH + "get_metadata_from_imds", return_value=NETWORK_METADATA + ) def test_imds_api_version_wanted_exists(self, m_get_metadata_from_imds): - sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}} - odata = {'HostName': "myhost", 'UserName': "myuser"} + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} data = { - 'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': sys_cfg + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, } dsrc = self._get_ds(data) dsrc.get_data() self.assertIsNotNone(dsrc.metadata) self.assertFalse(dsrc.failed_desired_api_version) - @mock.patch(MOCKPATH + 'get_metadata_from_imds') + @mock.patch(MOCKPATH + "get_metadata_from_imds") def test_hostname_from_imds(self, m_get_metadata_from_imds): - sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}} - odata = {'HostName': "myhost", 'UserName': "myuser"} + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} data = { - 'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': sys_cfg + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, } imds_data_with_os_profile = copy.deepcopy(NETWORK_METADATA) imds_data_with_os_profile["compute"]["osProfile"] = dict( adminUsername="username1", computerName="hostname1", - disablePasswordAuthentication="true" + disablePasswordAuthentication="true", ) m_get_metadata_from_imds.return_value = imds_data_with_os_profile dsrc = self._get_ds(data) dsrc.get_data() self.assertEqual(dsrc.metadata["local-hostname"], "hostname1") - @mock.patch(MOCKPATH + 'get_metadata_from_imds') + @mock.patch(MOCKPATH + "get_metadata_from_imds") def test_username_from_imds(self, m_get_metadata_from_imds): - sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}} - odata = {'HostName': "myhost", 'UserName': "myuser"} + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} data = { - 'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': sys_cfg + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, } imds_data_with_os_profile = copy.deepcopy(NETWORK_METADATA) imds_data_with_os_profile["compute"]["osProfile"] = dict( adminUsername="username1", computerName="hostname1", - disablePasswordAuthentication="true" + disablePasswordAuthentication="true", ) m_get_metadata_from_imds.return_value = imds_data_with_os_profile dsrc = self._get_ds(data) dsrc.get_data() self.assertEqual( - dsrc.cfg["system_info"]["default_user"]["name"], - "username1" + dsrc.cfg["system_info"]["default_user"]["name"], "username1" ) - @mock.patch(MOCKPATH + 'get_metadata_from_imds') + @mock.patch(MOCKPATH + "get_metadata_from_imds") def test_disable_password_from_imds(self, m_get_metadata_from_imds): - sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}} - odata = {'HostName': "myhost", 'UserName': "myuser"} + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} data = { - 'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': sys_cfg + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, } imds_data_with_os_profile = copy.deepcopy(NETWORK_METADATA) imds_data_with_os_profile["compute"]["osProfile"] = dict( adminUsername="username1", computerName="hostname1", - disablePasswordAuthentication="true" + disablePasswordAuthentication="true", ) m_get_metadata_from_imds.return_value = imds_data_with_os_profile dsrc = self._get_ds(data) dsrc.get_data() self.assertTrue(dsrc.metadata["disable_password"]) - @mock.patch(MOCKPATH + 'get_metadata_from_imds') + @mock.patch(MOCKPATH + "get_metadata_from_imds") def test_userdata_from_imds(self, m_get_metadata_from_imds): - sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}} - odata = {'HostName': "myhost", 'UserName': "myuser"} + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} data = { - 'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': sys_cfg + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, } userdata = "userdataImds" imds_data = copy.deepcopy(NETWORK_METADATA) @@ -1974,20 +2245,22 @@ scbus-1 on xpt0 bus 0 dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(dsrc.userdata_raw, userdata.encode('utf-8')) + self.assertEqual(dsrc.userdata_raw, userdata.encode("utf-8")) - @mock.patch(MOCKPATH + 'get_metadata_from_imds') + @mock.patch(MOCKPATH + "get_metadata_from_imds") def test_userdata_from_imds_with_customdata_from_OVF( - self, m_get_metadata_from_imds): + self, m_get_metadata_from_imds + ): userdataOVF = "userdataOVF" odata = { - 'HostName': "myhost", 'UserName': "myuser", - 'UserData': {'text': b64e(userdataOVF), 'encoding': 'base64'} + "HostName": "myhost", + "UserName": "myuser", + "UserData": {"text": b64e(userdataOVF), "encoding": "base64"}, } - sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}} + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} data = { - 'ovfcontent': construct_valid_ovf_env(data=odata), - 'sys_cfg': sys_cfg + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, } userdataImds = "userdataImds" @@ -2002,7 +2275,7 @@ scbus-1 on xpt0 bus 0 dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) - self.assertEqual(dsrc.userdata_raw, userdataOVF.encode('utf-8')) + self.assertEqual(dsrc.userdata_raw, userdataOVF.encode("utf-8")) class TestLoadAzureDsDir(CiTestCase): @@ -2017,39 +2290,40 @@ class TestLoadAzureDsDir(CiTestCase): with self.assertRaises(dsaz.NonAzureDataSource) as context_manager: dsaz.load_azure_ds_dir(self.source_dir) self.assertEqual( - 'No ovf-env file found', - str(context_manager.exception)) + "No ovf-env file found", str(context_manager.exception) + ) def test_wb_invalid_ovf_env_xml_calls_read_azure_ovf(self): """load_azure_ds_dir calls read_azure_ovf to parse the xml.""" - ovf_path = os.path.join(self.source_dir, 'ovf-env.xml') - with open(ovf_path, 'wb') as stream: - stream.write(b'invalid xml') + ovf_path = os.path.join(self.source_dir, "ovf-env.xml") + with open(ovf_path, "wb") as stream: + stream.write(b"invalid xml") with self.assertRaises(dsaz.BrokenAzureDataSource) as context_manager: dsaz.load_azure_ds_dir(self.source_dir) self.assertEqual( - 'Invalid ovf-env.xml: syntax error: line 1, column 0', - str(context_manager.exception)) + "Invalid ovf-env.xml: syntax error: line 1, column 0", + str(context_manager.exception), + ) class TestReadAzureOvf(CiTestCase): - def test_invalid_xml_raises_non_azure_ds(self): invalid_xml = "<foo>" + construct_valid_ovf_env(data={}) - self.assertRaises(dsaz.BrokenAzureDataSource, - dsaz.read_azure_ovf, invalid_xml) + self.assertRaises( + dsaz.BrokenAzureDataSource, dsaz.read_azure_ovf, invalid_xml + ) def test_load_with_pubkeys(self): - mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': ''}] - pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist] + mypklist = [{"fingerprint": "fp1", "path": "path1", "value": ""}] + pubkeys = [(x["fingerprint"], x["path"], x["value"]) for x in mypklist] content = construct_valid_ovf_env(pubkeys=pubkeys) (_md, _ud, cfg) = dsaz.read_azure_ovf(content) for mypk in mypklist: - self.assertIn(mypk, cfg['_pubkeys']) + self.assertIn(mypk, cfg["_pubkeys"]) class TestCanDevBeReformatted(CiTestCase): - warning_file = 'dataloss_warning_readme.txt' + warning_file = "dataloss_warning_readme.txt" def _domock(self, mockpath, sattr=None): patcher = mock.patch(mockpath) @@ -2060,42 +2334,42 @@ class TestCanDevBeReformatted(CiTestCase): bypath = {} for path, data in devs.items(): bypath[path] = data - if 'realpath' in data: - bypath[data['realpath']] = data - for ppath, pdata in data.get('partitions', {}).items(): + if "realpath" in data: + bypath[data["realpath"]] = data + for ppath, pdata in data.get("partitions", {}).items(): bypath[ppath] = pdata - if 'realpath' in data: - bypath[pdata['realpath']] = pdata + if "realpath" in data: + bypath[pdata["realpath"]] = pdata def realpath(d): - return bypath[d].get('realpath', d) + return bypath[d].get("realpath", d) def partitions_on_device(devpath): - parts = bypath.get(devpath, {}).get('partitions', {}) + parts = bypath.get(devpath, {}).get("partitions", {}) ret = [] for path, data in parts.items(): - ret.append((data.get('num'), realpath(path))) + ret.append((data.get("num"), realpath(path))) # return sorted by partition number return sorted(ret, key=lambda d: d[0]) def mount_cb(device, callback, mtype, update_env_for_mount): - self.assertEqual('ntfs', mtype) - self.assertEqual('C', update_env_for_mount.get('LANG')) + self.assertEqual("ntfs", mtype) + self.assertEqual("C", update_env_for_mount.get("LANG")) p = self.tmp_dir() - for f in bypath.get(device).get('files', []): + for f in bypath.get(device).get("files", []): write_file(os.path.join(p, f), content=f) return callback(p) def has_ntfs_fs(device): - return bypath.get(device, {}).get('fs') == 'ntfs' + return bypath.get(device, {}).get("fs") == "ntfs" p = MOCKPATH - self._domock(p + "_partitions_on_device", 'm_partitions_on_device') - self._domock(p + "_has_ntfs_filesystem", 'm_has_ntfs_filesystem') - self._domock(p + "util.mount_cb", 'm_mount_cb') - self._domock(p + "os.path.realpath", 'm_realpath') - self._domock(p + "os.path.exists", 'm_exists') - self._domock(p + "util.SeLinuxGuard", 'm_selguard') + self._domock(p + "_partitions_on_device", "m_partitions_on_device") + self._domock(p + "_has_ntfs_filesystem", "m_has_ntfs_filesystem") + self._domock(p + "util.mount_cb", "m_mount_cb") + self._domock(p + "os.path.realpath", "m_realpath") + self._domock(p + "os.path.exists", "m_exists") + self._domock(p + "util.SeLinuxGuard", "m_selguard") self.m_exists.side_effect = lambda p: p in bypath self.m_realpath.side_effect = realpath @@ -2107,330 +2381,433 @@ class TestCanDevBeReformatted(CiTestCase): def test_three_partitions_is_false(self): """A disk with 3 partitions can not be formatted.""" - self.patchup({ - '/dev/sda': { - 'partitions': { - '/dev/sda1': {'num': 1}, - '/dev/sda2': {'num': 2}, - '/dev/sda3': {'num': 3}, - }}}) - value, msg = dsaz.can_dev_be_reformatted("/dev/sda", - preserve_ntfs=False) + self.patchup( + { + "/dev/sda": { + "partitions": { + "/dev/sda1": {"num": 1}, + "/dev/sda2": {"num": 2}, + "/dev/sda3": {"num": 3}, + } + } + } + ) + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=False + ) self.assertFalse(value) self.assertIn("3 or more", msg.lower()) def test_no_partitions_is_false(self): """A disk with no partitions can not be formatted.""" - self.patchup({'/dev/sda': {}}) - value, msg = dsaz.can_dev_be_reformatted("/dev/sda", - preserve_ntfs=False) + self.patchup({"/dev/sda": {}}) + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=False + ) self.assertFalse(value) self.assertIn("not partitioned", msg.lower()) def test_two_partitions_not_ntfs_false(self): """2 partitions and 2nd not ntfs can not be formatted.""" - self.patchup({ - '/dev/sda': { - 'partitions': { - '/dev/sda1': {'num': 1}, - '/dev/sda2': {'num': 2, 'fs': 'ext4', 'files': []}, - }}}) - value, msg = dsaz.can_dev_be_reformatted("/dev/sda", - preserve_ntfs=False) + self.patchup( + { + "/dev/sda": { + "partitions": { + "/dev/sda1": {"num": 1}, + "/dev/sda2": {"num": 2, "fs": "ext4", "files": []}, + } + } + } + ) + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=False + ) self.assertFalse(value) self.assertIn("not ntfs", msg.lower()) def test_two_partitions_ntfs_populated_false(self): """2 partitions and populated ntfs fs on 2nd can not be formatted.""" - self.patchup({ - '/dev/sda': { - 'partitions': { - '/dev/sda1': {'num': 1}, - '/dev/sda2': {'num': 2, 'fs': 'ntfs', - 'files': ['secret.txt']}, - }}}) - value, msg = dsaz.can_dev_be_reformatted("/dev/sda", - preserve_ntfs=False) + self.patchup( + { + "/dev/sda": { + "partitions": { + "/dev/sda1": {"num": 1}, + "/dev/sda2": { + "num": 2, + "fs": "ntfs", + "files": ["secret.txt"], + }, + } + } + } + ) + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=False + ) self.assertFalse(value) self.assertIn("files on it", msg.lower()) def test_two_partitions_ntfs_empty_is_true(self): """2 partitions and empty ntfs fs on 2nd can be formatted.""" - self.patchup({ - '/dev/sda': { - 'partitions': { - '/dev/sda1': {'num': 1}, - '/dev/sda2': {'num': 2, 'fs': 'ntfs', 'files': []}, - }}}) - value, msg = dsaz.can_dev_be_reformatted("/dev/sda", - preserve_ntfs=False) + self.patchup( + { + "/dev/sda": { + "partitions": { + "/dev/sda1": {"num": 1}, + "/dev/sda2": {"num": 2, "fs": "ntfs", "files": []}, + } + } + } + ) + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=False + ) self.assertTrue(value) self.assertIn("safe for", msg.lower()) def test_one_partition_not_ntfs_false(self): """1 partition witih fs other than ntfs can not be formatted.""" - self.patchup({ - '/dev/sda': { - 'partitions': { - '/dev/sda1': {'num': 1, 'fs': 'zfs'}, - }}}) - value, msg = dsaz.can_dev_be_reformatted("/dev/sda", - preserve_ntfs=False) + self.patchup( + { + "/dev/sda": { + "partitions": { + "/dev/sda1": {"num": 1, "fs": "zfs"}, + } + } + } + ) + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=False + ) self.assertFalse(value) self.assertIn("not ntfs", msg.lower()) def test_one_partition_ntfs_populated_false(self): """1 mountable ntfs partition with many files can not be formatted.""" - self.patchup({ - '/dev/sda': { - 'partitions': { - '/dev/sda1': {'num': 1, 'fs': 'ntfs', - 'files': ['file1.txt', 'file2.exe']}, - }}}) - with mock.patch.object(dsaz.LOG, 'warning') as warning: - value, msg = dsaz.can_dev_be_reformatted("/dev/sda", - preserve_ntfs=False) + self.patchup( + { + "/dev/sda": { + "partitions": { + "/dev/sda1": { + "num": 1, + "fs": "ntfs", + "files": ["file1.txt", "file2.exe"], + }, + } + } + } + ) + with mock.patch.object(dsaz.LOG, "warning") as warning: + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=False + ) wmsg = warning.call_args[0][0] - self.assertIn("looks like you're using NTFS on the ephemeral disk", - wmsg) + self.assertIn( + "looks like you're using NTFS on the ephemeral disk", wmsg + ) self.assertFalse(value) self.assertIn("files on it", msg.lower()) def test_one_partition_ntfs_empty_is_true(self): """1 mountable ntfs partition and no files can be formatted.""" - self.patchup({ - '/dev/sda': { - 'partitions': { - '/dev/sda1': {'num': 1, 'fs': 'ntfs', 'files': []} - }}}) - value, msg = dsaz.can_dev_be_reformatted("/dev/sda", - preserve_ntfs=False) + self.patchup( + { + "/dev/sda": { + "partitions": { + "/dev/sda1": {"num": 1, "fs": "ntfs", "files": []} + } + } + } + ) + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=False + ) self.assertTrue(value) self.assertIn("safe for", msg.lower()) def test_one_partition_ntfs_empty_with_dataloss_file_is_true(self): """1 mountable ntfs partition and only warn file can be formatted.""" - self.patchup({ - '/dev/sda': { - 'partitions': { - '/dev/sda1': {'num': 1, 'fs': 'ntfs', - 'files': ['dataloss_warning_readme.txt']} - }}}) - value, msg = dsaz.can_dev_be_reformatted("/dev/sda", - preserve_ntfs=False) + self.patchup( + { + "/dev/sda": { + "partitions": { + "/dev/sda1": { + "num": 1, + "fs": "ntfs", + "files": ["dataloss_warning_readme.txt"], + } + } + } + } + ) + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=False + ) self.assertTrue(value) self.assertIn("safe for", msg.lower()) def test_one_partition_through_realpath_is_true(self): """A symlink to a device with 1 ntfs partition can be formatted.""" - epath = '/dev/disk/cloud/azure_resource' - self.patchup({ - epath: { - 'realpath': '/dev/sdb', - 'partitions': { - epath + '-part1': { - 'num': 1, 'fs': 'ntfs', 'files': [self.warning_file], - 'realpath': '/dev/sdb1'} - }}}) - value, msg = dsaz.can_dev_be_reformatted(epath, - preserve_ntfs=False) + epath = "/dev/disk/cloud/azure_resource" + self.patchup( + { + epath: { + "realpath": "/dev/sdb", + "partitions": { + epath + + "-part1": { + "num": 1, + "fs": "ntfs", + "files": [self.warning_file], + "realpath": "/dev/sdb1", + } + }, + } + } + ) + value, msg = dsaz.can_dev_be_reformatted(epath, preserve_ntfs=False) self.assertTrue(value) self.assertIn("safe for", msg.lower()) def test_three_partition_through_realpath_is_false(self): """A symlink to a device with 3 partitions can not be formatted.""" - epath = '/dev/disk/cloud/azure_resource' - self.patchup({ - epath: { - 'realpath': '/dev/sdb', - 'partitions': { - epath + '-part1': { - 'num': 1, 'fs': 'ntfs', 'files': [self.warning_file], - 'realpath': '/dev/sdb1'}, - epath + '-part2': {'num': 2, 'fs': 'ext3', - 'realpath': '/dev/sdb2'}, - epath + '-part3': {'num': 3, 'fs': 'ext', - 'realpath': '/dev/sdb3'} - }}}) - value, msg = dsaz.can_dev_be_reformatted(epath, - preserve_ntfs=False) + epath = "/dev/disk/cloud/azure_resource" + self.patchup( + { + epath: { + "realpath": "/dev/sdb", + "partitions": { + epath + + "-part1": { + "num": 1, + "fs": "ntfs", + "files": [self.warning_file], + "realpath": "/dev/sdb1", + }, + epath + + "-part2": { + "num": 2, + "fs": "ext3", + "realpath": "/dev/sdb2", + }, + epath + + "-part3": { + "num": 3, + "fs": "ext", + "realpath": "/dev/sdb3", + }, + }, + } + } + ) + value, msg = dsaz.can_dev_be_reformatted(epath, preserve_ntfs=False) self.assertFalse(value) self.assertIn("3 or more", msg.lower()) def test_ntfs_mount_errors_true(self): """can_dev_be_reformatted does not fail if NTFS is unknown fstype.""" - self.patchup({ - '/dev/sda': { - 'partitions': { - '/dev/sda1': {'num': 1, 'fs': 'ntfs', 'files': []} - }}}) + self.patchup( + { + "/dev/sda": { + "partitions": { + "/dev/sda1": {"num": 1, "fs": "ntfs", "files": []} + } + } + } + ) error_msgs = [ "Stderr: mount: unknown filesystem type 'ntfs'", # RHEL - "Stderr: mount: /dev/sdb1: unknown filesystem type 'ntfs'" # SLES + "Stderr: mount: /dev/sdb1: unknown filesystem type 'ntfs'", # SLES ] for err_msg in error_msgs: self.m_mount_cb.side_effect = MountFailedError( - "Failed mounting %s to %s due to: \nUnexpected.\n%s" % - ('/dev/sda', '/fake-tmp/dir', err_msg)) + "Failed mounting %s to %s due to: \nUnexpected.\n%s" + % ("/dev/sda", "/fake-tmp/dir", err_msg) + ) - value, msg = dsaz.can_dev_be_reformatted('/dev/sda', - preserve_ntfs=False) + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=False + ) self.assertTrue(value) - self.assertIn('cannot mount NTFS, assuming', msg) + self.assertIn("cannot mount NTFS, assuming", msg) def test_never_destroy_ntfs_config_false(self): """Normally formattable situation with never_destroy_ntfs set.""" - self.patchup({ - '/dev/sda': { - 'partitions': { - '/dev/sda1': {'num': 1, 'fs': 'ntfs', - 'files': ['dataloss_warning_readme.txt']} - }}}) - value, msg = dsaz.can_dev_be_reformatted("/dev/sda", - preserve_ntfs=True) + self.patchup( + { + "/dev/sda": { + "partitions": { + "/dev/sda1": { + "num": 1, + "fs": "ntfs", + "files": ["dataloss_warning_readme.txt"], + } + } + } + } + ) + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=True + ) self.assertFalse(value) - self.assertIn("config says to never destroy NTFS " - "(datasource.Azure.never_destroy_ntfs)", msg) + self.assertIn( + "config says to never destroy NTFS " + "(datasource.Azure.never_destroy_ntfs)", + msg, + ) class TestClearCachedData(CiTestCase): - def test_clear_cached_attrs_clears_imds(self): """All class attributes are reset to defaults, including imds data.""" tmp = self.tmp_dir() - paths = helpers.Paths( - {'cloud_dir': tmp, 'run_dir': tmp}) + paths = helpers.Paths({"cloud_dir": tmp, "run_dir": tmp}) dsrc = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=paths) clean_values = [dsrc.metadata, dsrc.userdata, dsrc._metadata_imds] - dsrc.metadata = 'md' - dsrc.userdata = 'ud' - dsrc._metadata_imds = 'imds' + dsrc.metadata = "md" + dsrc.userdata = "ud" + dsrc._metadata_imds = "imds" dsrc._dirty_cache = True dsrc.clear_cached_attrs() self.assertEqual( - [dsrc.metadata, dsrc.userdata, dsrc._metadata_imds], - clean_values) + [dsrc.metadata, dsrc.userdata, dsrc._metadata_imds], clean_values + ) class TestAzureNetExists(CiTestCase): - def test_azure_net_must_exist_for_legacy_objpkl(self): """DataSourceAzureNet must exist for old obj.pkl files - that reference it.""" + that reference it.""" self.assertTrue(hasattr(dsaz, "DataSourceAzureNet")) class TestPreprovisioningReadAzureOvfFlag(CiTestCase): - def test_read_azure_ovf_with_true_flag(self): """The read_azure_ovf method should set the PreprovisionedVM - cfg flag if the proper setting is present.""" + cfg flag if the proper setting is present.""" content = construct_valid_ovf_env( - platform_settings={"PreprovisionedVm": "True"}) + platform_settings={"PreprovisionedVm": "True"} + ) ret = dsaz.read_azure_ovf(content) cfg = ret[2] - self.assertTrue(cfg['PreprovisionedVm']) + self.assertTrue(cfg["PreprovisionedVm"]) def test_read_azure_ovf_with_false_flag(self): """The read_azure_ovf method should set the PreprovisionedVM - cfg flag to false if the proper setting is false.""" + cfg flag to false if the proper setting is false.""" content = construct_valid_ovf_env( - platform_settings={"PreprovisionedVm": "False"}) + platform_settings={"PreprovisionedVm": "False"} + ) ret = dsaz.read_azure_ovf(content) cfg = ret[2] - self.assertFalse(cfg['PreprovisionedVm']) + self.assertFalse(cfg["PreprovisionedVm"]) def test_read_azure_ovf_without_flag(self): """The read_azure_ovf method should not set the - PreprovisionedVM cfg flag.""" + PreprovisionedVM cfg flag.""" content = construct_valid_ovf_env() ret = dsaz.read_azure_ovf(content) cfg = ret[2] - self.assertFalse(cfg['PreprovisionedVm']) + self.assertFalse(cfg["PreprovisionedVm"]) self.assertEqual(None, cfg["PreprovisionedVMType"]) def test_read_azure_ovf_with_running_type(self): """The read_azure_ovf method should set PreprovisionedVMType - cfg flag to Running.""" + cfg flag to Running.""" content = construct_valid_ovf_env( - platform_settings={"PreprovisionedVMType": "Running", - "PreprovisionedVm": "True"}) + platform_settings={ + "PreprovisionedVMType": "Running", + "PreprovisionedVm": "True", + } + ) ret = dsaz.read_azure_ovf(content) cfg = ret[2] - self.assertTrue(cfg['PreprovisionedVm']) - self.assertEqual("Running", cfg['PreprovisionedVMType']) + self.assertTrue(cfg["PreprovisionedVm"]) + self.assertEqual("Running", cfg["PreprovisionedVMType"]) def test_read_azure_ovf_with_savable_type(self): """The read_azure_ovf method should set PreprovisionedVMType - cfg flag to Savable.""" + cfg flag to Savable.""" content = construct_valid_ovf_env( - platform_settings={"PreprovisionedVMType": "Savable", - "PreprovisionedVm": "True"}) + platform_settings={ + "PreprovisionedVMType": "Savable", + "PreprovisionedVm": "True", + } + ) ret = dsaz.read_azure_ovf(content) cfg = ret[2] - self.assertTrue(cfg['PreprovisionedVm']) - self.assertEqual("Savable", cfg['PreprovisionedVMType']) + self.assertTrue(cfg["PreprovisionedVm"]) + self.assertEqual("Savable", cfg["PreprovisionedVMType"]) -@mock.patch('os.path.isfile') +@mock.patch("os.path.isfile") class TestPreprovisioningShouldReprovision(CiTestCase): - def setUp(self): super(TestPreprovisioningShouldReprovision, self).setUp() tmp = self.tmp_dir() - self.waagent_d = self.tmp_path('/var/lib/waagent', tmp) - self.paths = helpers.Paths({'cloud_dir': tmp}) - dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d + self.waagent_d = self.tmp_path("/var/lib/waagent", tmp) + self.paths = helpers.Paths({"cloud_dir": tmp}) + dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d - @mock.patch(MOCKPATH + 'util.write_file') + @mock.patch(MOCKPATH + "util.write_file") def test__should_reprovision_with_true_cfg(self, isfile, write_f): """The _should_reprovision method should return true with config - flag present.""" + flag present.""" isfile.return_value = False dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) - self.assertTrue(dsa._should_reprovision( - (None, None, {'PreprovisionedVm': True}, None))) + self.assertTrue( + dsa._should_reprovision( + (None, None, {"PreprovisionedVm": True}, None) + ) + ) def test__should_reprovision_with_file_existing(self, isfile): """The _should_reprovision method should return True if the sentinal - exists.""" + exists.""" isfile.return_value = True dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) - self.assertTrue(dsa._should_reprovision( - (None, None, {'preprovisionedvm': False}, None))) + self.assertTrue( + dsa._should_reprovision( + (None, None, {"preprovisionedvm": False}, None) + ) + ) def test__should_reprovision_returns_false(self, isfile): """The _should_reprovision method should return False - if config and sentinal are not present.""" + if config and sentinal are not present.""" isfile.return_value = False dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) self.assertFalse(dsa._should_reprovision((None, None, {}, None))) - @mock.patch(MOCKPATH + 'util.write_file', autospec=True) + @mock.patch(MOCKPATH + "util.write_file", autospec=True) def test__should_reprovision_uses_imds_md(self, write_file, isfile): """The _should_reprovision method should be able to retrieve the preprovisioning VM type from imds metadata""" isfile.return_value = False dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) - self.assertTrue(dsa._should_reprovision( - (None, None, {}, None), - {'extended': {'compute': {'ppsType': 'Running'}}})) - self.assertFalse(dsa._should_reprovision( - (None, None, {}, None), - {})) - self.assertFalse(dsa._should_reprovision( - (None, None, {}, None), - {'extended': {'compute': {"hasCustomData": False}}})) - - @mock.patch(MOCKPATH + 'DataSourceAzure._poll_imds') + self.assertTrue( + dsa._should_reprovision( + (None, None, {}, None), + {"extended": {"compute": {"ppsType": "Running"}}}, + ) + ) + self.assertFalse(dsa._should_reprovision((None, None, {}, None), {})) + self.assertFalse( + dsa._should_reprovision( + (None, None, {}, None), + {"extended": {"compute": {"hasCustomData": False}}}, + ) + ) + + @mock.patch(MOCKPATH + "DataSourceAzure._poll_imds") def test_reprovision_calls__poll_imds(self, _poll_imds, isfile): """_reprovision will poll IMDS.""" isfile.return_value = False hostname = "myhost" username = "myuser" - odata = {'HostName': hostname, 'UserName': username} + odata = {"HostName": hostname, "UserName": username} _poll_imds.return_value = construct_valid_ovf_env(data=odata) dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) dsa._reprovision() @@ -2438,18 +2815,19 @@ class TestPreprovisioningShouldReprovision(CiTestCase): class TestPreprovisioningHotAttachNics(CiTestCase): - def setUp(self): super(TestPreprovisioningHotAttachNics, self).setUp() self.tmp = self.tmp_dir() - self.waagent_d = self.tmp_path('/var/lib/waagent', self.tmp) - self.paths = helpers.Paths({'cloud_dir': self.tmp}) - dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d - self.paths = helpers.Paths({'cloud_dir': self.tmp}) - - @mock.patch('cloudinit.sources.helpers.netlink.wait_for_nic_detach_event', - autospec=True) - @mock.patch(MOCKPATH + 'util.write_file', autospec=True) + self.waagent_d = self.tmp_path("/var/lib/waagent", self.tmp) + self.paths = helpers.Paths({"cloud_dir": self.tmp}) + dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d + self.paths = helpers.Paths({"cloud_dir": self.tmp}) + + @mock.patch( + "cloudinit.sources.helpers.netlink.wait_for_nic_detach_event", + autospec=True, + ) + @mock.patch(MOCKPATH + "util.write_file", autospec=True) def test_nic_detach_writes_marker(self, m_writefile, m_detach): """When we detect that a nic gets detached, we write a marker for it""" dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) @@ -2458,16 +2836,17 @@ class TestPreprovisioningHotAttachNics(CiTestCase): m_detach.assert_called_with(nl_sock) self.assertEqual(1, m_detach.call_count) m_writefile.assert_called_with( - dsaz.REPROVISION_NIC_DETACHED_MARKER_FILE, mock.ANY) + dsaz.REPROVISION_NIC_DETACHED_MARKER_FILE, mock.ANY + ) - @mock.patch(MOCKPATH + 'util.write_file', autospec=True) - @mock.patch(MOCKPATH + 'DataSourceAzure.fallback_interface') - @mock.patch(MOCKPATH + 'EphemeralDHCPv4WithReporting') - @mock.patch(MOCKPATH + 'DataSourceAzure._report_ready') - @mock.patch(MOCKPATH + 'DataSourceAzure._wait_for_nic_detach') + @mock.patch(MOCKPATH + "util.write_file", autospec=True) + @mock.patch(MOCKPATH + "DataSourceAzure.fallback_interface") + @mock.patch(MOCKPATH + "EphemeralDHCPv4WithReporting") + @mock.patch(MOCKPATH + "DataSourceAzure._report_ready") + @mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach") def test_detect_nic_attach_reports_ready_and_waits_for_detach( - self, m_detach, m_report_ready, m_dhcp, m_fallback_if, - m_writefile): + self, m_detach, m_report_ready, m_dhcp, m_fallback_if, m_writefile + ): """Report ready first and then wait for nic detach""" dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) dsa._wait_for_all_nics_ready() @@ -2476,16 +2855,18 @@ class TestPreprovisioningHotAttachNics(CiTestCase): self.assertEqual(1, m_detach.call_count) self.assertEqual(1, m_writefile.call_count) self.assertEqual(1, m_dhcp.call_count) - m_writefile.assert_called_with(dsaz.REPORTED_READY_MARKER_FILE, - mock.ANY) - - @mock.patch('os.path.isfile') - @mock.patch(MOCKPATH + 'DataSourceAzure.fallback_interface') - @mock.patch(MOCKPATH + 'EphemeralDHCPv4WithReporting') - @mock.patch(MOCKPATH + 'DataSourceAzure._report_ready') - @mock.patch(MOCKPATH + 'DataSourceAzure._wait_for_nic_detach') + m_writefile.assert_called_with( + dsaz.REPORTED_READY_MARKER_FILE, mock.ANY + ) + + @mock.patch("os.path.isfile") + @mock.patch(MOCKPATH + "DataSourceAzure.fallback_interface") + @mock.patch(MOCKPATH + "EphemeralDHCPv4WithReporting") + @mock.patch(MOCKPATH + "DataSourceAzure._report_ready") + @mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach") def test_detect_nic_attach_skips_report_ready_when_marker_present( - self, m_detach, m_report_ready, m_dhcp, m_fallback_if, m_isfile): + self, m_detach, m_report_ready, m_dhcp, m_fallback_if, m_isfile + ): """Skip reporting ready if we already have a marker file.""" dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) @@ -2499,13 +2880,14 @@ class TestPreprovisioningHotAttachNics(CiTestCase): self.assertEqual(0, m_dhcp.call_count) self.assertEqual(1, m_detach.call_count) - @mock.patch('os.path.isfile') - @mock.patch(MOCKPATH + 'DataSourceAzure.fallback_interface') - @mock.patch(MOCKPATH + 'EphemeralDHCPv4WithReporting') - @mock.patch(MOCKPATH + 'DataSourceAzure._report_ready') - @mock.patch(MOCKPATH + 'DataSourceAzure._wait_for_nic_detach') + @mock.patch("os.path.isfile") + @mock.patch(MOCKPATH + "DataSourceAzure.fallback_interface") + @mock.patch(MOCKPATH + "EphemeralDHCPv4WithReporting") + @mock.patch(MOCKPATH + "DataSourceAzure._report_ready") + @mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach") def test_detect_nic_attach_skips_nic_detach_when_marker_present( - self, m_detach, m_report_ready, m_dhcp, m_fallback_if, m_isfile): + self, m_detach, m_report_ready, m_dhcp, m_fallback_if, m_isfile + ): """Skip wait for nic detach if it already happened.""" dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) @@ -2516,22 +2898,32 @@ class TestPreprovisioningHotAttachNics(CiTestCase): self.assertEqual(0, m_dhcp.call_count) self.assertEqual(0, m_detach.call_count) - @mock.patch(MOCKPATH + 'DataSourceAzure.wait_for_link_up', autospec=True) - @mock.patch('cloudinit.sources.helpers.netlink.wait_for_nic_attach_event') - @mock.patch('cloudinit.sources.net.find_fallback_nic') - @mock.patch(MOCKPATH + 'get_metadata_from_imds') - @mock.patch(MOCKPATH + 'EphemeralDHCPv4') - @mock.patch(MOCKPATH + 'DataSourceAzure._wait_for_nic_detach') - @mock.patch('os.path.isfile') + @mock.patch(MOCKPATH + "DataSourceAzure.wait_for_link_up", autospec=True) + @mock.patch("cloudinit.sources.helpers.netlink.wait_for_nic_attach_event") + @mock.patch("cloudinit.sources.net.find_fallback_nic") + @mock.patch(MOCKPATH + "get_metadata_from_imds") + @mock.patch(MOCKPATH + "EphemeralDHCPv4") + @mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach") + @mock.patch("os.path.isfile") def test_wait_for_nic_attach_if_no_fallback_interface( - self, m_isfile, m_detach, m_dhcpv4, m_imds, m_fallback_if, - m_attach, m_link_up): + self, + m_isfile, + m_detach, + m_dhcpv4, + m_imds, + m_fallback_if, + m_attach, + m_link_up, + ): """Wait for nic attach if we do not have a fallback interface""" dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) lease = { - 'interface': 'eth9', 'fixed-address': '192.168.2.9', - 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0', - 'unknown-245': '624c3620'} + "interface": "eth9", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + "unknown-245": "624c3620", + } m_isfile.return_value = True m_attach.return_value = "eth0" @@ -2550,22 +2942,32 @@ class TestPreprovisioningHotAttachNics(CiTestCase): self.assertEqual(1, m_link_up.call_count) m_link_up.assert_called_with(mock.ANY, "eth0") - @mock.patch(MOCKPATH + 'DataSourceAzure.wait_for_link_up') - @mock.patch('cloudinit.sources.helpers.netlink.wait_for_nic_attach_event') - @mock.patch('cloudinit.sources.net.find_fallback_nic') - @mock.patch(MOCKPATH + 'DataSourceAzure.get_imds_data_with_api_fallback') - @mock.patch(MOCKPATH + 'EphemeralDHCPv4') - @mock.patch(MOCKPATH + 'DataSourceAzure._wait_for_nic_detach') - @mock.patch('os.path.isfile') + @mock.patch(MOCKPATH + "DataSourceAzure.wait_for_link_up") + @mock.patch("cloudinit.sources.helpers.netlink.wait_for_nic_attach_event") + @mock.patch("cloudinit.sources.net.find_fallback_nic") + @mock.patch(MOCKPATH + "DataSourceAzure.get_imds_data_with_api_fallback") + @mock.patch(MOCKPATH + "EphemeralDHCPv4") + @mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach") + @mock.patch("os.path.isfile") def test_wait_for_nic_attach_multinic_attach( - self, m_isfile, m_detach, m_dhcpv4, m_imds, m_fallback_if, - m_attach, m_link_up): + self, + m_isfile, + m_detach, + m_dhcpv4, + m_imds, + m_fallback_if, + m_attach, + m_link_up, + ): """Wait for nic attach if we do not have a fallback interface""" dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) lease = { - 'interface': 'eth9', 'fixed-address': '192.168.2.9', - 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0', - 'unknown-245': '624c3620'} + "interface": "eth9", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + "unknown-245": "624c3620", + } m_attach_call_count = 0 def nic_attach_ret(nl_sock, nics_found): @@ -2580,15 +2982,15 @@ class TestPreprovisioningHotAttachNics(CiTestCase): # Simulate two NICs by adding the same one twice. md = { "interface": [ - IMDS_NETWORK_METADATA['interface'][0], - IMDS_NETWORK_METADATA['interface'][0] + IMDS_NETWORK_METADATA["interface"][0], + IMDS_NETWORK_METADATA["interface"][0], ] } def network_metadata_ret(ifname, retries, type, exc_cb, infinite): if ifname == "eth0": return md - raise requests.Timeout('Fake connection timeout') + raise requests.Timeout("Fake connection timeout") m_isfile.return_value = True m_attach.side_effect = nic_attach_ret @@ -2607,25 +3009,29 @@ class TestPreprovisioningHotAttachNics(CiTestCase): self.assertEqual(1, m_imds.call_count) self.assertEqual(2, m_link_up.call_count) - @mock.patch(MOCKPATH + 'DataSourceAzure.get_imds_data_with_api_fallback') - @mock.patch(MOCKPATH + 'EphemeralDHCPv4') + @mock.patch(MOCKPATH + "DataSourceAzure.get_imds_data_with_api_fallback") + @mock.patch(MOCKPATH + "EphemeralDHCPv4") def test_check_if_nic_is_primary_retries_on_failures( - self, m_dhcpv4, m_imds): + self, m_dhcpv4, m_imds + ): """Retry polling for network metadata on all failures except timeout and network unreachable errors""" dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) lease = { - 'interface': 'eth9', 'fixed-address': '192.168.2.9', - 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0', - 'unknown-245': '624c3620'} + "interface": "eth9", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + "unknown-245": "624c3620", + } eth0Retries = [] eth1Retries = [] # Simulate two NICs by adding the same one twice. md = { "interface": [ - IMDS_NETWORK_METADATA['interface'][0], - IMDS_NETWORK_METADATA['interface'][0] + IMDS_NETWORK_METADATA["interface"][0], + IMDS_NETWORK_METADATA["interface"][0], ] } @@ -2645,9 +3051,9 @@ class TestPreprovisioningHotAttachNics(CiTestCase): # We are expected to retry for a certain period for both # timeout errors and network unreachable errors. if _ < 5: - cause = requests.Timeout('Fake connection timeout') + cause = requests.Timeout("Fake connection timeout") else: - cause = requests.ConnectionError('Network Unreachable') + cause = requests.ConnectionError("Network Unreachable") error = url_helper.UrlError(cause=cause) eth1Retries.append(exc_cb("Connection timeout", error)) # Should stop retrying after 10 retries @@ -2679,31 +3085,31 @@ class TestPreprovisioningHotAttachNics(CiTestCase): self.assertTrue(eth1Retries[i]) self.assertFalse(eth1Retries[10]) - @mock.patch('cloudinit.distros.networking.LinuxNetworking.try_set_link_up') - def test_wait_for_link_up_returns_if_already_up( - self, m_is_link_up): + @mock.patch("cloudinit.distros.networking.LinuxNetworking.try_set_link_up") + def test_wait_for_link_up_returns_if_already_up(self, m_is_link_up): """Waiting for link to be up should return immediately if the link is - already up.""" + already up.""" - distro_cls = distros.fetch('ubuntu') - distro = distro_cls('ubuntu', {}, self.paths) + distro_cls = distros.fetch("ubuntu") + distro = distro_cls("ubuntu", {}, self.paths) dsa = dsaz.DataSourceAzure({}, distro=distro, paths=self.paths) m_is_link_up.return_value = True dsa.wait_for_link_up("eth0") self.assertEqual(1, m_is_link_up.call_count) - @mock.patch(MOCKPATH + 'net.is_up', autospec=True) - @mock.patch(MOCKPATH + 'util.write_file') - @mock.patch('cloudinit.net.read_sys_net') - @mock.patch('cloudinit.distros.networking.LinuxNetworking.try_set_link_up') + @mock.patch(MOCKPATH + "net.is_up", autospec=True) + @mock.patch(MOCKPATH + "util.write_file") + @mock.patch("cloudinit.net.read_sys_net") + @mock.patch("cloudinit.distros.networking.LinuxNetworking.try_set_link_up") def test_wait_for_link_up_checks_link_after_sleep( - self, m_try_set_link_up, m_read_sys_net, m_writefile, m_is_up): + self, m_try_set_link_up, m_read_sys_net, m_writefile, m_is_up + ): """Waiting for link to be up should return immediately if the link is - already up.""" + already up.""" - distro_cls = distros.fetch('ubuntu') - distro = distro_cls('ubuntu', {}, self.paths) + distro_cls = distros.fetch("ubuntu") + distro = distro_cls("ubuntu", {}, self.paths) dsa = dsaz.DataSourceAzure({}, distro=distro, paths=self.paths) m_try_set_link_up.return_value = False @@ -2718,21 +3124,22 @@ class TestPreprovisioningHotAttachNics(CiTestCase): m_is_up.side_effect = is_up_mock - with mock.patch('cloudinit.sources.DataSourceAzure.sleep'): + with mock.patch("cloudinit.sources.DataSourceAzure.sleep"): dsa.wait_for_link_up("eth0") self.assertEqual(2, m_try_set_link_up.call_count) self.assertEqual(2, m_is_up.call_count) - @mock.patch(MOCKPATH + 'util.write_file') - @mock.patch('cloudinit.net.read_sys_net') - @mock.patch('cloudinit.distros.networking.LinuxNetworking.try_set_link_up') + @mock.patch(MOCKPATH + "util.write_file") + @mock.patch("cloudinit.net.read_sys_net") + @mock.patch("cloudinit.distros.networking.LinuxNetworking.try_set_link_up") def test_wait_for_link_up_writes_to_device_file( - self, m_is_link_up, m_read_sys_net, m_writefile): + self, m_is_link_up, m_read_sys_net, m_writefile + ): """Waiting for link to be up should return immediately if the link is - already up.""" + already up.""" - distro_cls = distros.fetch('ubuntu') - distro = distro_cls('ubuntu', {}, self.paths) + distro_cls = distros.fetch("ubuntu") + distro = distro_cls("ubuntu", {}, self.paths) dsa = dsaz.DataSourceAzure({}, distro=distro, paths=self.paths) callcount = 0 @@ -2751,48 +3158,59 @@ class TestPreprovisioningHotAttachNics(CiTestCase): self.assertEqual(1, m_read_sys_net.call_count) self.assertEqual(2, m_writefile.call_count) - @mock.patch('cloudinit.sources.helpers.netlink.' - 'create_bound_netlink_socket') + @mock.patch( + "cloudinit.sources.helpers.netlink.create_bound_netlink_socket" + ) def test_wait_for_all_nics_ready_raises_if_socket_fails(self, m_socket): """Waiting for all nics should raise exception if netlink socket - creation fails.""" + creation fails.""" m_socket.side_effect = netlink.NetlinkCreateSocketError - distro_cls = distros.fetch('ubuntu') - distro = distro_cls('ubuntu', {}, self.paths) + distro_cls = distros.fetch("ubuntu") + distro = distro_cls("ubuntu", {}, self.paths) dsa = dsaz.DataSourceAzure({}, distro=distro, paths=self.paths) - self.assertRaises(netlink.NetlinkCreateSocketError, - dsa._wait_for_all_nics_ready) + self.assertRaises( + netlink.NetlinkCreateSocketError, dsa._wait_for_all_nics_ready + ) # dsa._wait_for_all_nics_ready() -@mock.patch('cloudinit.net.dhcp.EphemeralIPv4Network') -@mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery') -@mock.patch('cloudinit.sources.helpers.netlink.' - 'wait_for_media_disconnect_connect') -@mock.patch('requests.Session.request') -@mock.patch(MOCKPATH + 'DataSourceAzure._report_ready') +@mock.patch("cloudinit.net.dhcp.EphemeralIPv4Network") +@mock.patch("cloudinit.net.dhcp.maybe_perform_dhcp_discovery") +@mock.patch( + "cloudinit.sources.helpers.netlink.wait_for_media_disconnect_connect" +) +@mock.patch("requests.Session.request") +@mock.patch(MOCKPATH + "DataSourceAzure._report_ready") class TestPreprovisioningPollIMDS(CiTestCase): - def setUp(self): super(TestPreprovisioningPollIMDS, self).setUp() self.tmp = self.tmp_dir() - self.waagent_d = self.tmp_path('/var/lib/waagent', self.tmp) - self.paths = helpers.Paths({'cloud_dir': self.tmp}) - dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d - - @mock.patch('time.sleep', mock.MagicMock()) - @mock.patch(MOCKPATH + 'EphemeralDHCPv4') - def test_poll_imds_re_dhcp_on_timeout(self, m_dhcpv4, m_report_ready, - m_request, m_media_switch, m_dhcp, - m_net): + self.waagent_d = self.tmp_path("/var/lib/waagent", self.tmp) + self.paths = helpers.Paths({"cloud_dir": self.tmp}) + dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d + + @mock.patch("time.sleep", mock.MagicMock()) + @mock.patch(MOCKPATH + "EphemeralDHCPv4") + def test_poll_imds_re_dhcp_on_timeout( + self, + m_dhcpv4, + m_report_ready, + m_request, + m_media_switch, + m_dhcp, + m_net, + ): """The poll_imds will retry DHCP on IMDS timeout.""" - report_file = self.tmp_path('report_marker', self.tmp) + report_file = self.tmp_path("report_marker", self.tmp) lease = { - 'interface': 'eth9', 'fixed-address': '192.168.2.9', - 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0', - 'unknown-245': '624c3620'} + "interface": "eth9", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + "unknown-245": "624c3620", + } m_dhcp.return_value = [lease] m_media_switch.return_value = None dhcp_ctx = mock.MagicMock(lease=lease) @@ -2804,7 +3222,7 @@ class TestPreprovisioningPollIMDS(CiTestCase): def fake_timeout_once(**kwargs): self.tries += 1 if self.tries == 1: - raise requests.Timeout('Fake connection timeout') + raise requests.Timeout("Fake connection timeout") elif self.tries in (2, 3): response = requests.Response() response.status_code = 404 if self.tries == 2 else 410 @@ -2817,41 +3235,54 @@ class TestPreprovisioningPollIMDS(CiTestCase): m_request.side_effect = fake_timeout_once dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) - with mock.patch(MOCKPATH + 'REPORTED_READY_MARKER_FILE', report_file): + with mock.patch(MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file): dsa._poll_imds() self.assertEqual(m_report_ready.call_count, 1) m_report_ready.assert_called_with(lease=lease) - self.assertEqual(3, m_dhcpv4.call_count, 'Expected 3 DHCP calls') - self.assertEqual(4, self.tries, 'Expected 4 total reads from IMDS') + self.assertEqual(3, m_dhcpv4.call_count, "Expected 3 DHCP calls") + self.assertEqual(4, self.tries, "Expected 4 total reads from IMDS") - @mock.patch('os.path.isfile') + @mock.patch("os.path.isfile") def test_poll_imds_skips_dhcp_if_ctx_present( - self, m_isfile, report_ready_func, fake_resp, m_media_switch, - m_dhcp, m_net): + self, + m_isfile, + report_ready_func, + fake_resp, + m_media_switch, + m_dhcp, + m_net, + ): """The poll_imds function should reuse the dhcp ctx if it is already - present. This happens when we wait for nic to be hot-attached before - polling for reprovisiondata. Note that if this ctx is set when - _poll_imds is called, then it is not expected to be waiting for - media_disconnect_connect either.""" - report_file = self.tmp_path('report_marker', self.tmp) + present. This happens when we wait for nic to be hot-attached before + polling for reprovisiondata. Note that if this ctx is set when + _poll_imds is called, then it is not expected to be waiting for + media_disconnect_connect either.""" + report_file = self.tmp_path("report_marker", self.tmp) m_isfile.return_value = True dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) dsa._ephemeral_dhcp_ctx = "Dummy dhcp ctx" - with mock.patch(MOCKPATH + 'REPORTED_READY_MARKER_FILE', report_file): + with mock.patch(MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file): dsa._poll_imds() self.assertEqual(0, m_dhcp.call_count) self.assertEqual(0, m_media_switch.call_count) - @mock.patch('os.path.isfile') - @mock.patch(MOCKPATH + 'EphemeralDHCPv4') + @mock.patch("os.path.isfile") + @mock.patch(MOCKPATH + "EphemeralDHCPv4") def test_poll_imds_does_dhcp_on_retries_if_ctx_present( - self, m_ephemeral_dhcpv4, m_isfile, report_ready_func, m_request, - m_media_switch, m_dhcp, m_net): + self, + m_ephemeral_dhcpv4, + m_isfile, + report_ready_func, + m_request, + m_media_switch, + m_dhcp, + m_net, + ): """The poll_imds function should reuse the dhcp ctx if it is already - present. This happens when we wait for nic to be hot-attached before - polling for reprovisiondata. Note that if this ctx is set when - _poll_imds is called, then it is not expected to be waiting for - media_disconnect_connect either.""" + present. This happens when we wait for nic to be hot-attached before + polling for reprovisiondata. Note that if this ctx is set when + _poll_imds is called, then it is not expected to be waiting for + media_disconnect_connect either.""" tries = 0 @@ -2859,15 +3290,16 @@ class TestPreprovisioningPollIMDS(CiTestCase): nonlocal tries tries += 1 if tries == 1: - raise requests.Timeout('Fake connection timeout') + raise requests.Timeout("Fake connection timeout") return mock.MagicMock(status_code=200, text="good", content="good") m_request.side_effect = fake_timeout_once - report_file = self.tmp_path('report_marker', self.tmp) + report_file = self.tmp_path("report_marker", self.tmp) m_isfile.return_value = True dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) - with mock.patch(MOCKPATH + 'REPORTED_READY_MARKER_FILE', report_file),\ - mock.patch.object(dsa, '_ephemeral_dhcp_ctx') as m_dhcp_ctx: + with mock.patch( + MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file + ), mock.patch.object(dsa, "_ephemeral_dhcp_ctx") as m_dhcp_ctx: m_dhcp_ctx.obtain_lease.return_value = "Dummy lease" dsa._ephemeral_dhcp_ctx = m_dhcp_ctx dsa._poll_imds() @@ -2877,145 +3309,189 @@ class TestPreprovisioningPollIMDS(CiTestCase): self.assertEqual(2, m_request.call_count) def test_does_not_poll_imds_report_ready_when_marker_file_exists( - self, m_report_ready, m_request, m_media_switch, m_dhcp, m_net): + self, m_report_ready, m_request, m_media_switch, m_dhcp, m_net + ): """poll_imds should not call report ready when the reported ready marker file exists""" - report_file = self.tmp_path('report_marker', self.tmp) - write_file(report_file, content='dont run report_ready :)') - m_dhcp.return_value = [{ - 'interface': 'eth9', 'fixed-address': '192.168.2.9', - 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0', - 'unknown-245': '624c3620'}] + report_file = self.tmp_path("report_marker", self.tmp) + write_file(report_file, content="dont run report_ready :)") + m_dhcp.return_value = [ + { + "interface": "eth9", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + "unknown-245": "624c3620", + } + ] m_media_switch.return_value = None dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) - with mock.patch(MOCKPATH + 'REPORTED_READY_MARKER_FILE', report_file): + with mock.patch(MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file): dsa._poll_imds() self.assertEqual(m_report_ready.call_count, 0) def test_poll_imds_report_ready_success_writes_marker_file( - self, m_report_ready, m_request, m_media_switch, m_dhcp, m_net): + self, m_report_ready, m_request, m_media_switch, m_dhcp, m_net + ): """poll_imds should write the report_ready marker file if reporting ready succeeds""" - report_file = self.tmp_path('report_marker', self.tmp) - m_dhcp.return_value = [{ - 'interface': 'eth9', 'fixed-address': '192.168.2.9', - 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0', - 'unknown-245': '624c3620'}] + report_file = self.tmp_path("report_marker", self.tmp) + m_dhcp.return_value = [ + { + "interface": "eth9", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + "unknown-245": "624c3620", + } + ] m_media_switch.return_value = None dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) self.assertFalse(os.path.exists(report_file)) - with mock.patch(MOCKPATH + 'REPORTED_READY_MARKER_FILE', report_file): + with mock.patch(MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file): dsa._poll_imds() self.assertEqual(m_report_ready.call_count, 1) self.assertTrue(os.path.exists(report_file)) def test_poll_imds_report_ready_failure_raises_exc_and_doesnt_write_marker( - self, m_report_ready, m_request, m_media_switch, m_dhcp, m_net): + self, m_report_ready, m_request, m_media_switch, m_dhcp, m_net + ): """poll_imds should write the report_ready marker file if reporting ready succeeds""" - report_file = self.tmp_path('report_marker', self.tmp) - m_dhcp.return_value = [{ - 'interface': 'eth9', 'fixed-address': '192.168.2.9', - 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0', - 'unknown-245': '624c3620'}] + report_file = self.tmp_path("report_marker", self.tmp) + m_dhcp.return_value = [ + { + "interface": "eth9", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + "unknown-245": "624c3620", + } + ] m_media_switch.return_value = None m_report_ready.return_value = False dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) self.assertFalse(os.path.exists(report_file)) - with mock.patch(MOCKPATH + 'REPORTED_READY_MARKER_FILE', report_file): - self.assertRaises( - InvalidMetaDataException, - dsa._poll_imds) + with mock.patch(MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file): + self.assertRaises(InvalidMetaDataException, dsa._poll_imds) self.assertEqual(m_report_ready.call_count, 1) self.assertFalse(os.path.exists(report_file)) -@mock.patch(MOCKPATH + 'DataSourceAzure._report_ready', mock.MagicMock()) -@mock.patch(MOCKPATH + 'subp.subp', mock.MagicMock()) -@mock.patch(MOCKPATH + 'util.write_file', mock.MagicMock()) -@mock.patch('cloudinit.sources.helpers.netlink.' - 'wait_for_media_disconnect_connect') -@mock.patch('cloudinit.net.dhcp.EphemeralIPv4Network', autospec=True) -@mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery') -@mock.patch('requests.Session.request') +@mock.patch(MOCKPATH + "DataSourceAzure._report_ready", mock.MagicMock()) +@mock.patch(MOCKPATH + "subp.subp", mock.MagicMock()) +@mock.patch(MOCKPATH + "util.write_file", mock.MagicMock()) +@mock.patch( + "cloudinit.sources.helpers.netlink.wait_for_media_disconnect_connect" +) +@mock.patch("cloudinit.net.dhcp.EphemeralIPv4Network", autospec=True) +@mock.patch("cloudinit.net.dhcp.maybe_perform_dhcp_discovery") +@mock.patch("requests.Session.request") class TestAzureDataSourcePreprovisioning(CiTestCase): - def setUp(self): super(TestAzureDataSourcePreprovisioning, self).setUp() tmp = self.tmp_dir() - self.waagent_d = self.tmp_path('/var/lib/waagent', tmp) - self.paths = helpers.Paths({'cloud_dir': tmp}) - dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d + self.waagent_d = self.tmp_path("/var/lib/waagent", tmp) + self.paths = helpers.Paths({"cloud_dir": tmp}) + dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d - def test_poll_imds_returns_ovf_env(self, m_request, - m_dhcp, m_net, - m_media_switch): + def test_poll_imds_returns_ovf_env( + self, m_request, m_dhcp, m_net, m_media_switch + ): """The _poll_imds method should return the ovf_env.xml.""" m_media_switch.return_value = None - m_dhcp.return_value = [{ - 'interface': 'eth9', 'fixed-address': '192.168.2.9', - 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0'}] - url = 'http://{0}/metadata/reprovisiondata?api-version=2019-06-01' + m_dhcp.return_value = [ + { + "interface": "eth9", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + } + ] + url = "http://{0}/metadata/reprovisiondata?api-version=2019-06-01" host = "169.254.169.254" full_url = url.format(host) - m_request.return_value = mock.MagicMock(status_code=200, text="ovf", - content="ovf") + m_request.return_value = mock.MagicMock( + status_code=200, text="ovf", content="ovf" + ) dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) self.assertTrue(len(dsa._poll_imds()) > 0) - self.assertEqual(m_request.call_args_list, - [mock.call(allow_redirects=True, - headers={'Metadata': 'true', - 'User-Agent': - 'Cloud-Init/%s' % vs() - }, method='GET', - timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS, - url=full_url)]) + self.assertEqual( + m_request.call_args_list, + [ + mock.call( + allow_redirects=True, + headers={ + "Metadata": "true", + "User-Agent": "Cloud-Init/%s" % vs(), + }, + method="GET", + timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS, + url=full_url, + ) + ], + ) self.assertEqual(m_dhcp.call_count, 2) m_net.assert_any_call( - broadcast='192.168.2.255', interface='eth9', ip='192.168.2.9', - prefix_or_mask='255.255.255.0', router='192.168.2.1', - static_routes=None) + broadcast="192.168.2.255", + interface="eth9", + ip="192.168.2.9", + prefix_or_mask="255.255.255.0", + router="192.168.2.1", + static_routes=None, + ) self.assertEqual(m_net.call_count, 2) - def test__reprovision_calls__poll_imds(self, m_request, - m_dhcp, m_net, - m_media_switch): + def test__reprovision_calls__poll_imds( + self, m_request, m_dhcp, m_net, m_media_switch + ): """The _reprovision method should call poll IMDS.""" m_media_switch.return_value = None - m_dhcp.return_value = [{ - 'interface': 'eth9', 'fixed-address': '192.168.2.9', - 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0', - 'unknown-245': '624c3620'}] - url = 'http://{0}/metadata/reprovisiondata?api-version=2019-06-01' + m_dhcp.return_value = [ + { + "interface": "eth9", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + "unknown-245": "624c3620", + } + ] + url = "http://{0}/metadata/reprovisiondata?api-version=2019-06-01" host = "169.254.169.254" full_url = url.format(host) hostname = "myhost" username = "myuser" - odata = {'HostName': hostname, 'UserName': username} + odata = {"HostName": hostname, "UserName": username} content = construct_valid_ovf_env(data=odata) - m_request.return_value = mock.MagicMock(status_code=200, text=content, - content=content) + m_request.return_value = mock.MagicMock( + status_code=200, text=content, content=content + ) dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) md, _ud, cfg, _d = dsa._reprovision() - self.assertEqual(md['local-hostname'], hostname) - self.assertEqual(cfg['system_info']['default_user']['name'], username) + self.assertEqual(md["local-hostname"], hostname) + self.assertEqual(cfg["system_info"]["default_user"]["name"], username) self.assertIn( mock.call( allow_redirects=True, headers={ - 'Metadata': 'true', - 'User-Agent': 'Cloud-Init/%s' % vs() + "Metadata": "true", + "User-Agent": "Cloud-Init/%s" % vs(), }, - method='GET', + method="GET", timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS, - url=full_url + url=full_url, ), - m_request.call_args_list) + m_request.call_args_list, + ) self.assertEqual(m_dhcp.call_count, 2) m_net.assert_any_call( - broadcast='192.168.2.255', interface='eth9', ip='192.168.2.9', - prefix_or_mask='255.255.255.0', router='192.168.2.1', - static_routes=None) + broadcast="192.168.2.255", + interface="eth9", + ip="192.168.2.9", + prefix_or_mask="255.255.255.0", + router="192.168.2.1", + static_routes=None, + ) self.assertEqual(m_net.call_count, 2) @@ -3029,36 +3505,42 @@ class TestRemoveUbuntuNetworkConfigScripts(CiTestCase): def test_remove_network_scripts_removes_both_files_and_directories(self): """Any files or directories in paths are removed when present.""" - file1 = self.tmp_path('file1', dir=self.tmp) - subdir = self.tmp_path('sub1', dir=self.tmp) - subfile = self.tmp_path('leaf1', dir=subdir) - write_file(file1, 'file1content') - write_file(subfile, 'leafcontent') + file1 = self.tmp_path("file1", dir=self.tmp) + subdir = self.tmp_path("sub1", dir=self.tmp) + subfile = self.tmp_path("leaf1", dir=subdir) + write_file(file1, "file1content") + write_file(subfile, "leafcontent") dsaz.maybe_remove_ubuntu_network_config_scripts(paths=[subdir, file1]) for path in (file1, subdir, subfile): - self.assertFalse(os.path.exists(path), - 'Found unremoved: %s' % path) + self.assertFalse( + os.path.exists(path), "Found unremoved: %s" % path + ) expected_logs = [ - 'INFO: Removing Ubuntu extended network scripts because cloud-init' - ' updates Azure network configuration on the following events:' + "INFO: Removing Ubuntu extended network scripts because cloud-init" + " updates Azure network configuration on the following events:" " ['boot', 'boot-legacy']", - 'Recursively deleting %s' % subdir, - 'Attempting to remove %s' % file1] + "Recursively deleting %s" % subdir, + "Attempting to remove %s" % file1, + ] for log in expected_logs: self.assertIn(log, self.logs.getvalue()) def test_remove_network_scripts_only_attempts_removal_if_path_exists(self): """Any files or directories absent are skipped without error.""" - dsaz.maybe_remove_ubuntu_network_config_scripts(paths=[ - self.tmp_path('nodirhere/', dir=self.tmp), - self.tmp_path('notfilehere', dir=self.tmp)]) - self.assertNotIn('/not/a', self.logs.getvalue()) # No delete logs - - @mock.patch(MOCKPATH + 'os.path.exists') - def test_remove_network_scripts_default_removes_stock_scripts(self, - m_exists): + dsaz.maybe_remove_ubuntu_network_config_scripts( + paths=[ + self.tmp_path("nodirhere/", dir=self.tmp), + self.tmp_path("notfilehere", dir=self.tmp), + ] + ) + self.assertNotIn("/not/a", self.logs.getvalue()) # No delete logs + + @mock.patch(MOCKPATH + "os.path.exists") + def test_remove_network_scripts_default_removes_stock_scripts( + self, m_exists + ): """Azure's stock ubuntu image scripts and artifacts are removed.""" # Report path absent on all to avoid delete operation m_exists.return_value = False @@ -3070,24 +3552,25 @@ class TestRemoveUbuntuNetworkConfigScripts(CiTestCase): class TestWBIsPlatformViable(CiTestCase): """White box tests for _is_platform_viable.""" + with_logs = True - @mock.patch(MOCKPATH + 'dmi.read_dmi_data') + @mock.patch(MOCKPATH + "dmi.read_dmi_data") def test_true_on_non_azure_chassis(self, m_read_dmi_data): """Return True if DMI chassis-asset-tag is AZURE_CHASSIS_ASSET_TAG.""" m_read_dmi_data.return_value = dsaz.AZURE_CHASSIS_ASSET_TAG - self.assertTrue(dsaz._is_platform_viable('doesnotmatter')) + self.assertTrue(dsaz._is_platform_viable("doesnotmatter")) - @mock.patch(MOCKPATH + 'os.path.exists') - @mock.patch(MOCKPATH + 'dmi.read_dmi_data') + @mock.patch(MOCKPATH + "os.path.exists") + @mock.patch(MOCKPATH + "dmi.read_dmi_data") def test_true_on_azure_ovf_env_in_seed_dir(self, m_read_dmi_data, m_exist): """Return True if ovf-env.xml exists in known seed dirs.""" # Non-matching Azure chassis-asset-tag - m_read_dmi_data.return_value = dsaz.AZURE_CHASSIS_ASSET_TAG + 'X' + m_read_dmi_data.return_value = dsaz.AZURE_CHASSIS_ASSET_TAG + "X" m_exist.return_value = True - self.assertTrue(dsaz._is_platform_viable('/some/seed/dir')) - m_exist.called_once_with('/other/seed/dir') + self.assertTrue(dsaz._is_platform_viable("/some/seed/dir")) + m_exist.called_once_with("/other/seed/dir") def test_false_on_no_matching_azure_criteria(self): """Report non-azure on unmatched asset tag, ovf-env absent and no dev. @@ -3096,17 +3579,25 @@ class TestWBIsPlatformViable(CiTestCase): AZURE_CHASSIS_ASSET_TAG, no ovf-env.xml files exist in known seed dirs and no devices have a label starting with prefix 'rd_rdfe_'. """ - self.assertFalse(wrap_and_call( - MOCKPATH, - {'os.path.exists': False, - # Non-matching Azure chassis-asset-tag - 'dmi.read_dmi_data': dsaz.AZURE_CHASSIS_ASSET_TAG + 'X', - 'subp.which': None}, - dsaz._is_platform_viable, 'doesnotmatter')) + self.assertFalse( + wrap_and_call( + MOCKPATH, + { + "os.path.exists": False, + # Non-matching Azure chassis-asset-tag + "dmi.read_dmi_data": dsaz.AZURE_CHASSIS_ASSET_TAG + "X", + "subp.which": None, + }, + dsaz._is_platform_viable, + "doesnotmatter", + ) + ) self.assertIn( "DEBUG: Non-Azure DMI asset tag '{0}' discovered.\n".format( - dsaz.AZURE_CHASSIS_ASSET_TAG + 'X'), - self.logs.getvalue()) + dsaz.AZURE_CHASSIS_ASSET_TAG + "X" + ), + self.logs.getvalue(), + ) class TestRandomSeed(CiTestCase): @@ -3120,13 +3611,14 @@ class TestRandomSeed(CiTestCase): path = resourceLocation("azure/non_unicode_random_string") result = dsaz._get_random_seed(path) - obj = {'seed': result} + obj = {"seed": result} try: serialized = json_dumps(obj) deserialized = load_json(serialized) except UnicodeDecodeError: self.fail("Non-serializable random seed returned") - self.assertEqual(deserialized['seed'], result) + self.assertEqual(deserialized["seed"], result) + # vi: ts=4 expandtab |