diff options
Diffstat (limited to 'tests/unittests/sources/test_azure.py')
-rw-r--r-- | tests/unittests/sources/test_azure.py | 4306 |
1 files changed, 4306 insertions, 0 deletions
diff --git a/tests/unittests/sources/test_azure.py b/tests/unittests/sources/test_azure.py new file mode 100644 index 00000000..5f956a63 --- /dev/null +++ b/tests/unittests/sources/test_azure.py @@ -0,0 +1,4306 @@ +# This file is part of cloud-init. See LICENSE file for license information. + +import copy +import crypt +import json +import os +import stat +import xml.etree.ElementTree as ET + +import httpretty +import pytest +import requests +import yaml + +from cloudinit import distros, helpers, url_helper +from cloudinit.sources import UNSET +from cloudinit.sources import DataSourceAzure as dsaz +from cloudinit.sources import InvalidMetaDataException +from cloudinit.sources.helpers import netlink +from cloudinit.util import ( + MountFailedError, + b64e, + decode_binary, + json_dumps, + load_file, + load_json, + write_file, +) +from cloudinit.version import version_string as vs +from tests.unittests.helpers import ( + CiTestCase, + ExitStack, + HttprettyTestCase, + mock, + populate_dir, + resourceLocation, + wrap_and_call, +) + +MOCKPATH = "cloudinit.sources.DataSourceAzure." + + +@pytest.fixture +def azure_ds(paths): + """Provide DataSourceAzure instance with mocks for minimal test case.""" + with mock.patch(MOCKPATH + "_is_platform_viable", return_value=True): + yield dsaz.DataSourceAzure(sys_cfg={}, distro=mock.Mock(), paths=paths) + + +@pytest.fixture +def mock_azure_helper_readurl(): + with mock.patch( + "cloudinit.sources.helpers.azure.url_helper.readurl", autospec=True + ) as m: + yield m + + +@pytest.fixture +def mock_azure_get_metadata_from_fabric(): + with mock.patch( + MOCKPATH + "get_metadata_from_fabric", + autospec=True, + ) as m: + yield m + + +@pytest.fixture +def mock_azure_report_failure_to_fabric(): + with mock.patch( + MOCKPATH + "report_failure_to_fabric", + autospec=True, + ) as m: + yield m + + +@pytest.fixture +def mock_dmi_read_dmi_data(): + def fake_read(key: str) -> str: + if key == "system-uuid": + return "fake-system-uuid" + raise RuntimeError() + + with mock.patch( + MOCKPATH + "dmi.read_dmi_data", + side_effect=fake_read, + autospec=True, + ) as m: + yield m + + +@pytest.fixture +def mock_net_dhcp_maybe_perform_dhcp_discovery(): + with mock.patch( + "cloudinit.net.dhcp.maybe_perform_dhcp_discovery", + return_value=[ + { + "unknown-245": "aa:bb:cc:dd", + "interface": "ethBoot0", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + } + ], + autospec=True, + ) as m: + yield m + + +@pytest.fixture +def mock_net_dhcp_EphemeralIPv4Network(): + with mock.patch( + "cloudinit.net.dhcp.EphemeralIPv4Network", + autospec=True, + ) as m: + yield m + + +@pytest.fixture +def mock_get_interfaces(): + with mock.patch(MOCKPATH + "net.get_interfaces", return_value=[]) as m: + yield m + + +@pytest.fixture +def mock_get_interface_mac(): + with mock.patch( + MOCKPATH + "net.get_interface_mac", + return_value="001122334455", + ) as m: + yield m + + +@pytest.fixture +def mock_netlink(): + with mock.patch( + MOCKPATH + "netlink", + autospec=True, + ) as m: + yield m + + +@pytest.fixture +def mock_os_path_isfile(): + with mock.patch(MOCKPATH + "os.path.isfile", autospec=True) as m: + yield m + + +@pytest.fixture +def mock_readurl(): + with mock.patch(MOCKPATH + "readurl", autospec=True) as m: + yield m + + +@pytest.fixture +def mock_subp_subp(): + with mock.patch(MOCKPATH + "subp.subp", side_effect=[]) as m: + yield m + + +@pytest.fixture +def mock_util_ensure_dir(): + with mock.patch( + MOCKPATH + "util.ensure_dir", + autospec=True, + ) as m: + yield m + + +@pytest.fixture +def mock_util_find_devs_with(): + with mock.patch(MOCKPATH + "util.find_devs_with", autospec=True) as m: + yield m + + +@pytest.fixture +def mock_util_load_file(): + with mock.patch( + MOCKPATH + "util.load_file", + autospec=True, + return_value=b"", + ) as m: + yield m + + +@pytest.fixture +def mock_util_mount_cb(): + with mock.patch( + MOCKPATH + "util.mount_cb", + autospec=True, + return_value=({}, "", {}, {}), + ) as m: + yield m + + +@pytest.fixture +def mock_util_write_file(): + with mock.patch( + MOCKPATH + "util.write_file", + autospec=True, + ) as m: + yield m + + +def construct_valid_ovf_env( + data=None, pubkeys=None, userdata=None, platform_settings=None +): + if data is None: + data = {"HostName": "FOOHOST"} + if pubkeys is None: + pubkeys = {} + + content = """<?xml version="1.0" encoding="utf-8"?> +<Environment xmlns="http://schemas.dmtf.org/ovf/environment/1" + xmlns:oe="http://schemas.dmtf.org/ovf/environment/1" + xmlns:wa="http://schemas.microsoft.com/windowsazure" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + + <wa:ProvisioningSection><wa:Version>1.0</wa:Version> + <LinuxProvisioningConfigurationSet + xmlns="http://schemas.microsoft.com/windowsazure" + xmlns:i="http://www.w3.org/2001/XMLSchema-instance"> + <ConfigurationSetType>LinuxProvisioningConfiguration</ConfigurationSetType> + """ + for key, dval in data.items(): + if isinstance(dval, dict): + val = dict(dval).get("text") + attrs = " " + " ".join( + [ + "%s='%s'" % (k, v) + for k, v in dict(dval).items() + if k != "text" + ] + ) + else: + val = dval + attrs = "" + content += "<%s%s>%s</%s>\n" % (key, attrs, val, key) + + if userdata: + content += "<UserData>%s</UserData>\n" % (b64e(userdata)) + + if pubkeys: + content += "<SSH><PublicKeys>\n" + for fp, path, value in pubkeys: + content += " <PublicKey>" + if fp and path: + content += "<Fingerprint>%s</Fingerprint><Path>%s</Path>" % ( + fp, + path, + ) + if value: + content += "<Value>%s</Value>" % value + content += "</PublicKey>\n" + content += "</PublicKeys></SSH>" + content += """ + </LinuxProvisioningConfigurationSet> + </wa:ProvisioningSection> + <wa:PlatformSettingsSection><wa:Version>1.0</wa:Version> + <PlatformSettings xmlns="http://schemas.microsoft.com/windowsazure" + xmlns:i="http://www.w3.org/2001/XMLSchema-instance"> + <KmsServerHostname>kms.core.windows.net</KmsServerHostname> + <ProvisionGuestAgent>false</ProvisionGuestAgent> + <GuestAgentPackageName i:nil="true" />""" + if platform_settings: + for k, v in platform_settings.items(): + content += "<%s>%s</%s>\n" % (k, v, k) + if "PreprovisionedVMType" not in platform_settings: + content += """<PreprovisionedVMType i:nil="true" />""" + content += """</PlatformSettings></wa:PlatformSettingsSection> +</Environment>""" + + return content + + +NETWORK_METADATA = { + "compute": { + "location": "eastus2", + "name": "my-hostname", + "offer": "UbuntuServer", + "osType": "Linux", + "placementGroupId": "", + "platformFaultDomain": "0", + "platformUpdateDomain": "0", + "publisher": "Canonical", + "resourceGroupName": "srugroup1", + "sku": "19.04-DAILY", + "subscriptionId": "12aad61c-6de4-4e53-a6c6-5aff52a83777", + "tags": "", + "version": "19.04.201906190", + "vmId": "ff702a6b-cb6a-4fcd-ad68-b4ce38227642", + "vmScaleSetName": "", + "vmSize": "Standard_DS1_v2", + "zone": "", + "publicKeys": [{"keyData": "ssh-rsa key1", "path": "path1"}], + }, + "network": { + "interface": [ + { + "macAddress": "000D3A047598", + "ipv6": {"ipAddress": []}, + "ipv4": { + "subnet": [{"prefix": "24", "address": "10.0.0.0"}], + "ipAddress": [ + { + "privateIpAddress": "10.0.0.4", + "publicIpAddress": "104.46.124.81", + } + ], + }, + } + ] + }, +} + +SECONDARY_INTERFACE = { + "macAddress": "220D3A047598", + "ipv6": {"ipAddress": []}, + "ipv4": { + "subnet": [{"prefix": "24", "address": "10.0.1.0"}], + "ipAddress": [ + { + "privateIpAddress": "10.0.1.5", + } + ], + }, +} + +SECONDARY_INTERFACE_NO_IP = { + "macAddress": "220D3A047598", + "ipv6": {"ipAddress": []}, + "ipv4": { + "subnet": [{"prefix": "24", "address": "10.0.1.0"}], + "ipAddress": [], + }, +} + +IMDS_NETWORK_METADATA = { + "interface": [ + { + "macAddress": "000D3A047598", + "ipv6": {"ipAddress": []}, + "ipv4": { + "subnet": [{"prefix": "24", "address": "10.0.0.0"}], + "ipAddress": [ + { + "privateIpAddress": "10.0.0.4", + "publicIpAddress": "104.46.124.81", + } + ], + }, + } + ] +} + +EXAMPLE_UUID = "d0df4c54-4ecb-4a4b-9954-5bdf3ed5c3b8" + + +class TestParseNetworkConfig(CiTestCase): + + maxDiff = None + fallback_config = { + "version": 1, + "config": [ + { + "type": "physical", + "name": "eth0", + "mac_address": "00:11:22:33:44:55", + "params": {"driver": "hv_netsvc"}, + "subnets": [{"type": "dhcp"}], + } + ], + } + + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) + def test_single_ipv4_nic_configuration(self, m_driver): + """parse_network_config emits dhcp on single nic with ipv4""" + expected = { + "ethernets": { + "eth0": { + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 100}, + "dhcp6": False, + "match": {"macaddress": "00:0d:3a:04:75:98"}, + "set-name": "eth0", + } + }, + "version": 2, + } + self.assertEqual(expected, dsaz.parse_network_config(NETWORK_METADATA)) + + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) + def test_increases_route_metric_for_non_primary_nics(self, m_driver): + """parse_network_config increases route-metric for each nic""" + expected = { + "ethernets": { + "eth0": { + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 100}, + "dhcp6": False, + "match": {"macaddress": "00:0d:3a:04:75:98"}, + "set-name": "eth0", + }, + "eth1": { + "set-name": "eth1", + "match": {"macaddress": "22:0d:3a:04:75:98"}, + "dhcp6": False, + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 200}, + }, + "eth2": { + "set-name": "eth2", + "match": {"macaddress": "33:0d:3a:04:75:98"}, + "dhcp6": False, + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 300}, + }, + }, + "version": 2, + } + imds_data = copy.deepcopy(NETWORK_METADATA) + imds_data["network"]["interface"].append(SECONDARY_INTERFACE) + third_intf = copy.deepcopy(SECONDARY_INTERFACE) + third_intf["macAddress"] = third_intf["macAddress"].replace("22", "33") + third_intf["ipv4"]["subnet"][0]["address"] = "10.0.2.0" + third_intf["ipv4"]["ipAddress"][0]["privateIpAddress"] = "10.0.2.6" + imds_data["network"]["interface"].append(third_intf) + self.assertEqual(expected, dsaz.parse_network_config(imds_data)) + + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) + def test_ipv4_and_ipv6_route_metrics_match_for_nics(self, m_driver): + """parse_network_config emits matching ipv4 and ipv6 route-metrics.""" + expected = { + "ethernets": { + "eth0": { + "addresses": ["10.0.0.5/24", "2001:dead:beef::2/128"], + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 100}, + "dhcp6": True, + "dhcp6-overrides": {"route-metric": 100}, + "match": {"macaddress": "00:0d:3a:04:75:98"}, + "set-name": "eth0", + }, + "eth1": { + "set-name": "eth1", + "match": {"macaddress": "22:0d:3a:04:75:98"}, + "dhcp4": True, + "dhcp6": False, + "dhcp4-overrides": {"route-metric": 200}, + }, + "eth2": { + "set-name": "eth2", + "match": {"macaddress": "33:0d:3a:04:75:98"}, + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 300}, + "dhcp6": True, + "dhcp6-overrides": {"route-metric": 300}, + }, + }, + "version": 2, + } + imds_data = copy.deepcopy(NETWORK_METADATA) + nic1 = imds_data["network"]["interface"][0] + nic1["ipv4"]["ipAddress"].append({"privateIpAddress": "10.0.0.5"}) + + nic1["ipv6"] = { + "subnet": [{"address": "2001:dead:beef::16"}], + "ipAddress": [ + {"privateIpAddress": "2001:dead:beef::1"}, + {"privateIpAddress": "2001:dead:beef::2"}, + ], + } + imds_data["network"]["interface"].append(SECONDARY_INTERFACE) + third_intf = copy.deepcopy(SECONDARY_INTERFACE) + third_intf["macAddress"] = third_intf["macAddress"].replace("22", "33") + third_intf["ipv4"]["subnet"][0]["address"] = "10.0.2.0" + third_intf["ipv4"]["ipAddress"][0]["privateIpAddress"] = "10.0.2.6" + third_intf["ipv6"] = { + "subnet": [{"prefix": "64", "address": "2001:dead:beef::2"}], + "ipAddress": [{"privateIpAddress": "2001:dead:beef::1"}], + } + imds_data["network"]["interface"].append(third_intf) + self.assertEqual(expected, dsaz.parse_network_config(imds_data)) + + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) + def test_ipv4_secondary_ips_will_be_static_addrs(self, m_driver): + """parse_network_config emits primary ipv4 as dhcp others are static""" + expected = { + "ethernets": { + "eth0": { + "addresses": ["10.0.0.5/24"], + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 100}, + "dhcp6": True, + "dhcp6-overrides": {"route-metric": 100}, + "match": {"macaddress": "00:0d:3a:04:75:98"}, + "set-name": "eth0", + } + }, + "version": 2, + } + imds_data = copy.deepcopy(NETWORK_METADATA) + nic1 = imds_data["network"]["interface"][0] + nic1["ipv4"]["ipAddress"].append({"privateIpAddress": "10.0.0.5"}) + + nic1["ipv6"] = { + "subnet": [{"prefix": "10", "address": "2001:dead:beef::16"}], + "ipAddress": [{"privateIpAddress": "2001:dead:beef::1"}], + } + self.assertEqual(expected, dsaz.parse_network_config(imds_data)) + + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) + def test_ipv6_secondary_ips_will_be_static_cidrs(self, m_driver): + """parse_network_config emits primary ipv6 as dhcp others are static""" + expected = { + "ethernets": { + "eth0": { + "addresses": ["10.0.0.5/24", "2001:dead:beef::2/10"], + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 100}, + "dhcp6": True, + "dhcp6-overrides": {"route-metric": 100}, + "match": {"macaddress": "00:0d:3a:04:75:98"}, + "set-name": "eth0", + } + }, + "version": 2, + } + imds_data = copy.deepcopy(NETWORK_METADATA) + nic1 = imds_data["network"]["interface"][0] + nic1["ipv4"]["ipAddress"].append({"privateIpAddress": "10.0.0.5"}) + + # Secondary ipv6 addresses currently ignored/unconfigured + nic1["ipv6"] = { + "subnet": [{"prefix": "10", "address": "2001:dead:beef::16"}], + "ipAddress": [ + {"privateIpAddress": "2001:dead:beef::1"}, + {"privateIpAddress": "2001:dead:beef::2"}, + ], + } + self.assertEqual(expected, dsaz.parse_network_config(imds_data)) + + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", + return_value="hv_netvsc", + ) + def test_match_driver_for_netvsc(self, m_driver): + """parse_network_config emits driver when using netvsc.""" + expected = { + "ethernets": { + "eth0": { + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 100}, + "dhcp6": False, + "match": { + "macaddress": "00:0d:3a:04:75:98", + "driver": "hv_netvsc", + }, + "set-name": "eth0", + } + }, + "version": 2, + } + self.assertEqual(expected, dsaz.parse_network_config(NETWORK_METADATA)) + + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) + @mock.patch("cloudinit.net.generate_fallback_config") + def test_parse_network_config_uses_fallback_cfg_when_no_network_metadata( + self, m_fallback_config, m_driver + ): + """parse_network_config generates fallback network config when the + IMDS instance metadata is corrupted/invalid, such as when + network metadata is not present. + """ + imds_metadata_missing_network_metadata = copy.deepcopy( + NETWORK_METADATA + ) + del imds_metadata_missing_network_metadata["network"] + m_fallback_config.return_value = self.fallback_config + self.assertEqual( + self.fallback_config, + dsaz.parse_network_config(imds_metadata_missing_network_metadata), + ) + + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) + @mock.patch("cloudinit.net.generate_fallback_config") + def test_parse_network_config_uses_fallback_cfg_when_no_interface_metadata( + self, m_fallback_config, m_driver + ): + """parse_network_config generates fallback network config when the + IMDS instance metadata is corrupted/invalid, such as when + network interface metadata is not present. + """ + imds_metadata_missing_interface_metadata = copy.deepcopy( + NETWORK_METADATA + ) + del imds_metadata_missing_interface_metadata["network"]["interface"] + m_fallback_config.return_value = self.fallback_config + self.assertEqual( + self.fallback_config, + dsaz.parse_network_config( + imds_metadata_missing_interface_metadata + ), + ) + + +class TestGetMetadataFromIMDS(HttprettyTestCase): + + with_logs = True + + def setUp(self): + super(TestGetMetadataFromIMDS, self).setUp() + self.network_md_url = "{}/instance?api-version=2019-06-01".format( + dsaz.IMDS_URL + ) + + @mock.patch(MOCKPATH + "readurl", autospec=True) + def test_get_metadata_uses_instance_url(self, m_readurl): + """Make sure readurl is called with the correct url when accessing + metadata""" + m_readurl.return_value = url_helper.StringResponse( + json.dumps(IMDS_NETWORK_METADATA).encode("utf-8") + ) + + dsaz.get_metadata_from_imds(retries=3, md_type=dsaz.MetadataType.ALL) + m_readurl.assert_called_with( + "http://169.254.169.254/metadata/instance?api-version=2019-06-01", + exception_cb=mock.ANY, + headers=mock.ANY, + retries=mock.ANY, + timeout=mock.ANY, + infinite=False, + ) + + @mock.patch(MOCKPATH + "readurl", autospec=True) + def test_get_network_metadata_uses_network_url(self, m_readurl): + """Make sure readurl is called with the correct url when accessing + network metadata""" + m_readurl.return_value = url_helper.StringResponse( + json.dumps(IMDS_NETWORK_METADATA).encode("utf-8") + ) + + dsaz.get_metadata_from_imds( + retries=3, md_type=dsaz.MetadataType.NETWORK + ) + m_readurl.assert_called_with( + "http://169.254.169.254/metadata/instance/network?api-version=" + "2019-06-01", + exception_cb=mock.ANY, + headers=mock.ANY, + retries=mock.ANY, + timeout=mock.ANY, + infinite=False, + ) + + @mock.patch(MOCKPATH + "readurl", autospec=True) + @mock.patch(MOCKPATH + "EphemeralDHCPv4", autospec=True) + def test_get_default_metadata_uses_instance_url(self, m_dhcp, m_readurl): + """Make sure readurl is called with the correct url when accessing + metadata""" + m_readurl.return_value = url_helper.StringResponse( + json.dumps(IMDS_NETWORK_METADATA).encode("utf-8") + ) + + dsaz.get_metadata_from_imds(retries=3) + m_readurl.assert_called_with( + "http://169.254.169.254/metadata/instance?api-version=2019-06-01", + exception_cb=mock.ANY, + headers=mock.ANY, + retries=mock.ANY, + timeout=mock.ANY, + infinite=False, + ) + + @mock.patch(MOCKPATH + "readurl", autospec=True) + def test_get_metadata_uses_extended_url(self, m_readurl): + """Make sure readurl is called with the correct url when accessing + metadata""" + m_readurl.return_value = url_helper.StringResponse( + json.dumps(IMDS_NETWORK_METADATA).encode("utf-8") + ) + + dsaz.get_metadata_from_imds( + retries=3, + md_type=dsaz.MetadataType.ALL, + api_version="2021-08-01", + ) + m_readurl.assert_called_with( + "http://169.254.169.254/metadata/instance?api-version=" + "2021-08-01&extended=true", + exception_cb=mock.ANY, + headers=mock.ANY, + retries=mock.ANY, + timeout=mock.ANY, + infinite=False, + ) + + @mock.patch(MOCKPATH + "readurl", autospec=True) + def test_get_metadata_performs_dhcp_when_network_is_down(self, m_readurl): + """Perform DHCP setup when nic is not up.""" + m_readurl.return_value = url_helper.StringResponse( + json.dumps(NETWORK_METADATA).encode("utf-8") + ) + + self.assertEqual( + NETWORK_METADATA, dsaz.get_metadata_from_imds(retries=2) + ) + + self.assertIn( + "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time + self.logs.getvalue(), + ) + + m_readurl.assert_called_with( + self.network_md_url, + exception_cb=mock.ANY, + headers={"Metadata": "true"}, + retries=2, + timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS, + infinite=False, + ) + + @mock.patch("cloudinit.url_helper.time.sleep") + def test_get_metadata_from_imds_empty_when_no_imds_present(self, m_sleep): + """Return empty dict when IMDS network metadata is absent.""" + httpretty.register_uri( + httpretty.GET, + dsaz.IMDS_URL + "/instance?api-version=2017-12-01", + body={}, + status=404, + ) + + self.assertEqual({}, dsaz.get_metadata_from_imds(retries=2)) + + self.assertEqual([mock.call(1), mock.call(1)], m_sleep.call_args_list) + self.assertIn( + "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time + self.logs.getvalue(), + ) + + @mock.patch("requests.Session.request") + @mock.patch("cloudinit.url_helper.time.sleep") + def test_get_metadata_from_imds_retries_on_timeout( + self, m_sleep, m_request + ): + """Retry IMDS network metadata on timeout errors.""" + + self.attempt = 0 + m_request.side_effect = requests.Timeout("Fake Connection Timeout") + + def retry_callback(request, uri, headers): + self.attempt += 1 + raise requests.Timeout("Fake connection timeout") + + httpretty.register_uri( + httpretty.GET, + dsaz.IMDS_URL + "instance?api-version=2017-12-01", + body=retry_callback, + ) + + self.assertEqual({}, dsaz.get_metadata_from_imds(retries=3)) + + self.assertEqual([mock.call(1)] * 3, m_sleep.call_args_list) + self.assertIn( + "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time + self.logs.getvalue(), + ) + + +class TestAzureDataSource(CiTestCase): + + with_logs = True + + def setUp(self): + super(TestAzureDataSource, self).setUp() + self.tmp = self.tmp_dir() + + # patch cloud_dir, so our 'seed_dir' is guaranteed empty + self.paths = helpers.Paths( + {"cloud_dir": self.tmp, "run_dir": self.tmp} + ) + self.waagent_d = os.path.join(self.tmp, "var", "lib", "waagent") + + self.patches = ExitStack() + self.addCleanup(self.patches.close) + + self.patches.enter_context( + mock.patch.object(dsaz, "_get_random_seed", return_value="wild") + ) + + self.m_dhcp = self.patches.enter_context( + mock.patch.object( + dsaz, + "EphemeralDHCPv4", + autospec=True, + ) + ) + self.m_dhcp.return_value.lease = {} + self.m_dhcp.return_value.iface = "eth4" + + self.m_get_metadata_from_imds = self.patches.enter_context( + mock.patch.object( + dsaz, + "get_metadata_from_imds", + mock.MagicMock(return_value=NETWORK_METADATA), + ) + ) + self.m_fallback_nic = self.patches.enter_context( + mock.patch( + "cloudinit.sources.net.find_fallback_nic", return_value="eth9" + ) + ) + self.m_remove_ubuntu_network_scripts = self.patches.enter_context( + mock.patch.object( + dsaz, + "maybe_remove_ubuntu_network_config_scripts", + mock.MagicMock(), + ) + ) + super(TestAzureDataSource, self).setUp() + + def apply_patches(self, patches): + for module, name, new in patches: + self.patches.enter_context(mock.patch.object(module, name, new)) + + def _get_mockds(self): + sysctl_out = ( + "dev.storvsc.3.%pnpinfo: " + "classid=ba6163d9-04a1-4d29-b605-72e2ffb1dc7f " + "deviceid=f8b3781b-1e82-4818-a1c3-63d806ec15bb\n" + ) + sysctl_out += ( + "dev.storvsc.2.%pnpinfo: " + "classid=ba6163d9-04a1-4d29-b605-72e2ffb1dc7f " + "deviceid=f8b3781a-1e82-4818-a1c3-63d806ec15bb\n" + ) + sysctl_out += ( + "dev.storvsc.1.%pnpinfo: " + "classid=32412632-86cb-44a2-9b5c-50d1417354f5 " + "deviceid=00000000-0001-8899-0000-000000000000\n" + ) + camctl_devbus = """ +scbus0 on ata0 bus 0 +scbus1 on ata1 bus 0 +scbus2 on blkvsc0 bus 0 +scbus3 on blkvsc1 bus 0 +scbus4 on storvsc2 bus 0 +scbus5 on storvsc3 bus 0 +scbus-1 on xpt0 bus 0 + """ + camctl_dev = """ +<Msft Virtual CD/ROM 1.0> at scbus1 target 0 lun 0 (cd0,pass0) +<Msft Virtual Disk 1.0> at scbus2 target 0 lun 0 (da0,pass1) +<Msft Virtual Disk 1.0> at scbus3 target 1 lun 0 (da1,pass2) + """ + self.apply_patches( + [ + ( + dsaz, + "get_dev_storvsc_sysctl", + mock.MagicMock(return_value=sysctl_out), + ), + ( + dsaz, + "get_camcontrol_dev_bus", + mock.MagicMock(return_value=camctl_devbus), + ), + ( + dsaz, + "get_camcontrol_dev", + mock.MagicMock(return_value=camctl_dev), + ), + ] + ) + return dsaz + + def _get_ds( + self, + data, + distro="ubuntu", + apply_network=None, + instance_id=None, + write_ovf_to_data_dir: bool = False, + write_ovf_to_seed_dir: bool = True, + ): + def _wait_for_files(flist, _maxwait=None, _naplen=None): + data["waited"] = flist + return [] + + def _load_possible_azure_ds(seed_dir, cache_dir): + yield seed_dir + yield dsaz.DEFAULT_PROVISIONING_ISO_DEV + yield from data.get("dsdevs", []) + if cache_dir: + yield cache_dir + + seed_dir = os.path.join(self.paths.seed_dir, "azure") + if write_ovf_to_seed_dir and data.get("ovfcontent") is not None: + populate_dir(seed_dir, {"ovf-env.xml": data["ovfcontent"]}) + + if write_ovf_to_data_dir and data.get("ovfcontent") is not None: + populate_dir(self.waagent_d, {"ovf-env.xml": data["ovfcontent"]}) + + dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d + + self.m_is_platform_viable = mock.MagicMock(autospec=True) + self.m_get_metadata_from_fabric = mock.MagicMock(return_value=[]) + self.m_report_failure_to_fabric = mock.MagicMock(autospec=True) + self.m_get_interfaces = mock.MagicMock( + return_value=[ + ("dummy0", "9e:65:d6:19:19:01", None, None), + ("eth0", "00:15:5d:69:63:ba", "hv_netvsc", "0x3"), + ("lo", "00:00:00:00:00:00", None, None), + ] + ) + self.m_list_possible_azure_ds = mock.MagicMock( + side_effect=_load_possible_azure_ds + ) + + if instance_id: + self.instance_id = instance_id + else: + self.instance_id = EXAMPLE_UUID + + def _dmi_mocks(key): + if key == "system-uuid": + return self.instance_id + elif key == "chassis-asset-tag": + return "7783-7084-3265-9085-8269-3286-77" + + self.apply_patches( + [ + ( + dsaz, + "list_possible_azure_ds", + self.m_list_possible_azure_ds, + ), + (dsaz, "_is_platform_viable", self.m_is_platform_viable), + ( + dsaz, + "get_metadata_from_fabric", + self.m_get_metadata_from_fabric, + ), + ( + dsaz, + "report_failure_to_fabric", + self.m_report_failure_to_fabric, + ), + (dsaz, "get_boot_telemetry", mock.MagicMock()), + (dsaz, "get_system_info", mock.MagicMock()), + ( + dsaz.net, + "get_interface_mac", + mock.MagicMock(return_value="00:15:5d:69:63:ba"), + ), + ( + dsaz.net, + "get_interfaces", + self.m_get_interfaces, + ), + (dsaz.subp, "which", lambda x: True), + ( + dsaz.dmi, + "read_dmi_data", + mock.MagicMock(side_effect=_dmi_mocks), + ), + ( + dsaz.util, + "wait_for_files", + mock.MagicMock(side_effect=_wait_for_files), + ), + ] + ) + + if isinstance(distro, str): + distro_cls = distros.fetch(distro) + distro = distro_cls(distro, data.get("sys_cfg", {}), self.paths) + dsrc = dsaz.DataSourceAzure( + data.get("sys_cfg", {}), distro=distro, paths=self.paths + ) + if apply_network is not None: + dsrc.ds_cfg["apply_network_config"] = apply_network + + return dsrc + + def _get_and_setup(self, dsrc): + ret = dsrc.get_data() + if ret: + dsrc.setup(True) + return ret + + def xml_equals(self, oxml, nxml): + """Compare two sets of XML to make sure they are equal""" + + def create_tag_index(xml): + et = ET.fromstring(xml) + ret = {} + for x in et.iter(): + ret[x.tag] = x + return ret + + def tags_exists(x, y): + for tag in x.keys(): + assert tag in y + for tag in y.keys(): + assert tag in x + + def tags_equal(x, y): + for x_val in x.values(): + y_val = y.get(x_val.tag) + assert x_val.text == y_val.text + + old_cnt = create_tag_index(oxml) + new_cnt = create_tag_index(nxml) + tags_exists(old_cnt, new_cnt) + tags_equal(old_cnt, new_cnt) + + def xml_notequals(self, oxml, nxml): + try: + self.xml_equals(oxml, nxml) + except AssertionError: + return + raise AssertionError("XML is the same") + + def test_get_resource_disk(self): + ds = self._get_mockds() + dev = ds.get_resource_disk_on_freebsd(1) + self.assertEqual("da1", dev) + + def test_not_is_platform_viable_seed_should_return_no_datasource(self): + """Check seed_dir using _is_platform_viable and return False.""" + # Return a non-matching asset tag value + data = {} + dsrc = self._get_ds(data) + self.m_is_platform_viable.return_value = False + with mock.patch.object( + dsrc, "crawl_metadata" + ) as m_crawl_metadata, mock.patch.object( + dsrc, "_report_failure" + ) as m_report_failure: + ret = dsrc.get_data() + self.m_is_platform_viable.assert_called_with(dsrc.seed_dir) + self.assertFalse(ret) + # Assert that for non viable platforms, + # there is no communication with the Azure datasource. + self.assertEqual(0, m_crawl_metadata.call_count) + self.assertEqual(0, m_report_failure.call_count) + + def test_platform_viable_but_no_devs_should_return_no_datasource(self): + """For platforms where the Azure platform is viable + (which is indicated by the matching asset tag), + the absence of any devs at all (devs == candidate sources + for crawling Azure datasource) is NOT expected. + Report failure to Azure as this is an unexpected fatal error. + """ + data = {} + dsrc = self._get_ds(data) + with mock.patch.object(dsrc, "_report_failure") as m_report_failure: + self.m_is_platform_viable.return_value = True + ret = dsrc.get_data() + self.m_is_platform_viable.assert_called_with(dsrc.seed_dir) + self.assertFalse(ret) + self.assertEqual(1, m_report_failure.call_count) + + def test_crawl_metadata_exception_returns_no_datasource(self): + data = {} + dsrc = self._get_ds(data) + self.m_is_platform_viable.return_value = True + with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata: + m_crawl_metadata.side_effect = Exception + ret = dsrc.get_data() + self.m_is_platform_viable.assert_called_with(dsrc.seed_dir) + self.assertEqual(1, m_crawl_metadata.call_count) + self.assertFalse(ret) + + def test_crawl_metadata_exception_should_report_failure_with_msg(self): + data = {} + dsrc = self._get_ds(data) + self.m_is_platform_viable.return_value = True + with mock.patch.object( + dsrc, "crawl_metadata" + ) as m_crawl_metadata, mock.patch.object( + dsrc, "_report_failure" + ) as m_report_failure: + m_crawl_metadata.side_effect = Exception + dsrc.get_data() + self.assertEqual(1, m_crawl_metadata.call_count) + m_report_failure.assert_called_once_with( + description=dsaz.DEFAULT_REPORT_FAILURE_USER_VISIBLE_MESSAGE + ) + + def test_crawl_metadata_exc_should_log_could_not_crawl_msg(self): + data = {} + dsrc = self._get_ds(data) + self.m_is_platform_viable.return_value = True + with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata: + m_crawl_metadata.side_effect = Exception + dsrc.get_data() + self.assertEqual(1, m_crawl_metadata.call_count) + self.assertIn( + "Could not crawl Azure metadata", self.logs.getvalue() + ) + + def test_basic_seed_dir(self): + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": {}, + } + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEqual(dsrc.userdata_raw, "") + self.assertEqual(dsrc.metadata["local-hostname"], odata["HostName"]) + self.assertTrue( + os.path.isfile(os.path.join(self.waagent_d, "ovf-env.xml")) + ) + self.assertEqual("azure", dsrc.cloud_name) + self.assertEqual("azure", dsrc.platform_type) + self.assertEqual( + "seed-dir (%s/seed/azure)" % self.tmp, dsrc.subplatform + ) + + def test_data_dir_without_imds_data(self): + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": {}, + } + dsrc = self._get_ds( + data, write_ovf_to_data_dir=True, write_ovf_to_seed_dir=False + ) + + self.m_get_metadata_from_imds.return_value = {} + with mock.patch(MOCKPATH + "util.mount_cb") as m_mount_cb: + m_mount_cb.side_effect = [ + MountFailedError("fail"), + ({"local-hostname": "me"}, "ud", {"cfg": ""}, {}), + ] + ret = dsrc.get_data() + + self.assertTrue(ret) + self.assertEqual(dsrc.userdata_raw, "") + self.assertEqual(dsrc.metadata["local-hostname"], odata["HostName"]) + self.assertTrue( + os.path.isfile(os.path.join(self.waagent_d, "ovf-env.xml")) + ) + self.assertEqual("azure", dsrc.cloud_name) + self.assertEqual("azure", dsrc.platform_type) + self.assertEqual("seed-dir (%s)" % self.waagent_d, dsrc.subplatform) + + def test_basic_dev_file(self): + """When a device path is used, present that in subplatform.""" + data = {"sys_cfg": {}, "dsdevs": ["/dev/cd0"]} + dsrc = self._get_ds(data) + # DSAzure will attempt to mount /dev/sr0 first, which should + # fail with mount error since the list of devices doesn't have + # /dev/sr0 + with mock.patch(MOCKPATH + "util.mount_cb") as m_mount_cb: + m_mount_cb.side_effect = [ + MountFailedError("fail"), + ({"local-hostname": "me"}, "ud", {"cfg": ""}, {}), + ] + self.assertTrue(dsrc.get_data()) + self.assertEqual(dsrc.userdata_raw, "ud") + self.assertEqual(dsrc.metadata["local-hostname"], "me") + self.assertEqual("azure", dsrc.cloud_name) + self.assertEqual("azure", dsrc.platform_type) + self.assertEqual("config-disk (/dev/cd0)", dsrc.subplatform) + + def test_get_data_non_ubuntu_will_not_remove_network_scripts(self): + """get_data on non-Ubuntu will not remove ubuntu net scripts.""" + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": {}, + } + + dsrc = self._get_ds(data, distro="debian") + dsrc.get_data() + self.m_remove_ubuntu_network_scripts.assert_not_called() + + def test_get_data_on_ubuntu_will_remove_network_scripts(self): + """get_data will remove ubuntu net scripts on Ubuntu distro.""" + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } + + dsrc = self._get_ds(data, distro="ubuntu") + dsrc.get_data() + self.m_remove_ubuntu_network_scripts.assert_called_once_with() + + def test_get_data_on_ubuntu_will_not_remove_network_scripts_disabled(self): + """When apply_network_config false, do not remove scripts on Ubuntu.""" + sys_cfg = {"datasource": {"Azure": {"apply_network_config": False}}} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } + + dsrc = self._get_ds(data, distro="ubuntu") + dsrc.get_data() + self.m_remove_ubuntu_network_scripts.assert_not_called() + + def test_crawl_metadata_returns_structured_data_and_caches_nothing(self): + """Return all structured metadata and cache no class attributes.""" + yaml_cfg = "" + odata = { + "HostName": "myhost", + "UserName": "myuser", + "UserData": {"text": "FOOBAR", "encoding": "plain"}, + "dscfg": {"text": yaml_cfg, "encoding": "plain"}, + } + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": {}, + } + dsrc = self._get_ds(data) + expected_cfg = { + "PreprovisionedVMType": None, + "PreprovisionedVm": False, + "datasource": {"Azure": {}}, + "system_info": {"default_user": {"name": "myuser"}}, + } + expected_metadata = { + "azure_data": { + "configurationsettype": "LinuxProvisioningConfiguration" + }, + "imds": NETWORK_METADATA, + "instance-id": EXAMPLE_UUID, + "local-hostname": "myhost", + "random_seed": "wild", + } + + crawled_metadata = dsrc.crawl_metadata() + + self.assertCountEqual( + crawled_metadata.keys(), + ["cfg", "files", "metadata", "userdata_raw"], + ) + self.assertEqual(crawled_metadata["cfg"], expected_cfg) + self.assertEqual( + list(crawled_metadata["files"].keys()), ["ovf-env.xml"] + ) + self.assertIn( + b"<HostName>myhost</HostName>", + crawled_metadata["files"]["ovf-env.xml"], + ) + self.assertEqual(crawled_metadata["metadata"], expected_metadata) + self.assertEqual(crawled_metadata["userdata_raw"], "FOOBAR") + self.assertEqual(dsrc.userdata_raw, None) + self.assertEqual(dsrc.metadata, {}) + self.assertEqual(dsrc._metadata_imds, UNSET) + self.assertFalse( + os.path.isfile(os.path.join(self.waagent_d, "ovf-env.xml")) + ) + + def test_crawl_metadata_raises_invalid_metadata_on_error(self): + """crawl_metadata raises an exception on invalid ovf-env.xml.""" + data = {"ovfcontent": "BOGUS", "sys_cfg": {}} + dsrc = self._get_ds(data) + error_msg = ( + "BrokenAzureDataSource: Invalid ovf-env.xml:" + " syntax error: line 1, column 0" + ) + with self.assertRaises(InvalidMetaDataException) as cm: + dsrc.crawl_metadata() + self.assertEqual(str(cm.exception), error_msg) + + def test_crawl_metadata_call_imds_once_no_reprovision(self): + """If reprovisioning, report ready at the end""" + ovfenv = construct_valid_ovf_env( + platform_settings={"PreprovisionedVm": "False"} + ) + + data = {"ovfcontent": ovfenv, "sys_cfg": {}} + dsrc = self._get_ds(data) + dsrc.crawl_metadata() + self.assertEqual(1, self.m_get_metadata_from_imds.call_count) + + @mock.patch("cloudinit.sources.DataSourceAzure.util.write_file") + @mock.patch( + "cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready" + ) + @mock.patch("cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds") + def test_crawl_metadata_call_imds_twice_with_reprovision( + self, poll_imds_func, m_report_ready, m_write + ): + """If reprovisioning, imds metadata will be fetched twice""" + ovfenv = construct_valid_ovf_env( + platform_settings={"PreprovisionedVm": "True"} + ) + + data = {"ovfcontent": ovfenv, "sys_cfg": {}} + dsrc = self._get_ds(data) + poll_imds_func.return_value = ovfenv + dsrc.crawl_metadata() + self.assertEqual(2, self.m_get_metadata_from_imds.call_count) + + @mock.patch("cloudinit.sources.DataSourceAzure.util.write_file") + @mock.patch( + "cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready" + ) + @mock.patch("cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds") + def test_crawl_metadata_on_reprovision_reports_ready( + self, poll_imds_func, m_report_ready, m_write + ): + """If reprovisioning, report ready at the end""" + ovfenv = construct_valid_ovf_env( + platform_settings={"PreprovisionedVm": "True"} + ) + + data = {"ovfcontent": ovfenv, "sys_cfg": {}} + dsrc = self._get_ds(data) + poll_imds_func.return_value = ovfenv + dsrc.crawl_metadata() + self.assertEqual(1, m_report_ready.call_count) + + @mock.patch("cloudinit.sources.DataSourceAzure.util.write_file") + @mock.patch( + "cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready" + ) + @mock.patch("cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds") + @mock.patch( + "cloudinit.sources.DataSourceAzure.DataSourceAzure." + "_wait_for_all_nics_ready" + ) + def test_crawl_metadata_waits_for_nic_on_savable_vms( + self, detect_nics, poll_imds_func, report_ready_func, m_write + ): + """If reprovisioning, report ready at the end""" + ovfenv = construct_valid_ovf_env( + platform_settings={ + "PreprovisionedVMType": "Savable", + "PreprovisionedVm": "True", + } + ) + + data = {"ovfcontent": ovfenv, "sys_cfg": {}} + dsrc = self._get_ds(data) + poll_imds_func.return_value = ovfenv + dsrc.crawl_metadata() + self.assertEqual(1, report_ready_func.call_count) + self.assertEqual(1, detect_nics.call_count) + + @mock.patch("cloudinit.sources.DataSourceAzure.util.write_file") + @mock.patch( + "cloudinit.sources.helpers.netlink.wait_for_media_disconnect_connect" + ) + @mock.patch( + "cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready", + return_value=True, + ) + @mock.patch("cloudinit.sources.DataSourceAzure.readurl") + def test_crawl_metadata_on_reprovision_reports_ready_using_lease( + self, m_readurl, m_report_ready, m_media_switch, m_write + ): + """If reprovisioning, report ready using the obtained lease""" + ovfenv = construct_valid_ovf_env( + platform_settings={"PreprovisionedVm": "True"} + ) + + data = {"ovfcontent": ovfenv, "sys_cfg": {}} + dsrc = self._get_ds(data) + + lease = { + "interface": "eth9", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + "unknown-245": "624c3620", + } + self.m_dhcp.return_value.obtain_lease.return_value = lease + m_media_switch.return_value = None + + reprovision_ovfenv = construct_valid_ovf_env() + m_readurl.return_value = url_helper.StringResponse( + reprovision_ovfenv.encode("utf-8") + ) + + dsrc.crawl_metadata() + + assert m_report_ready.mock_calls == [ + mock.call(), + mock.call(pubkey_info=None), + ] + + def test_waagent_d_has_0700_perms(self): + # we expect /var/lib/waagent to be created 0700 + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertTrue(os.path.isdir(self.waagent_d)) + self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0o700) + + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) + def test_network_config_set_from_imds(self, m_driver): + """Datasource.network_config returns IMDS network data.""" + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } + expected_network_config = { + "ethernets": { + "eth0": { + "set-name": "eth0", + "match": {"macaddress": "00:0d:3a:04:75:98"}, + "dhcp6": False, + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 100}, + } + }, + "version": 2, + } + dsrc = self._get_ds(data) + dsrc.get_data() + self.assertEqual(expected_network_config, dsrc.network_config) + + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) + def test_network_config_set_from_imds_route_metric_for_secondary_nic( + self, m_driver + ): + """Datasource.network_config adds route-metric to secondary nics.""" + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } + expected_network_config = { + "ethernets": { + "eth0": { + "set-name": "eth0", + "match": {"macaddress": "00:0d:3a:04:75:98"}, + "dhcp6": False, + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 100}, + }, + "eth1": { + "set-name": "eth1", + "match": {"macaddress": "22:0d:3a:04:75:98"}, + "dhcp6": False, + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 200}, + }, + "eth2": { + "set-name": "eth2", + "match": {"macaddress": "33:0d:3a:04:75:98"}, + "dhcp6": False, + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 300}, + }, + }, + "version": 2, + } + imds_data = copy.deepcopy(NETWORK_METADATA) + imds_data["network"]["interface"].append(SECONDARY_INTERFACE) + third_intf = copy.deepcopy(SECONDARY_INTERFACE) + third_intf["macAddress"] = third_intf["macAddress"].replace("22", "33") + third_intf["ipv4"]["subnet"][0]["address"] = "10.0.2.0" + third_intf["ipv4"]["ipAddress"][0]["privateIpAddress"] = "10.0.2.6" + imds_data["network"]["interface"].append(third_intf) + + self.m_get_metadata_from_imds.return_value = imds_data + dsrc = self._get_ds(data) + dsrc.get_data() + self.assertEqual(expected_network_config, dsrc.network_config) + + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) + def test_network_config_set_from_imds_for_secondary_nic_no_ip( + self, m_driver + ): + """If an IP address is empty then there should no config for it.""" + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } + expected_network_config = { + "ethernets": { + "eth0": { + "set-name": "eth0", + "match": {"macaddress": "00:0d:3a:04:75:98"}, + "dhcp6": False, + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 100}, + } + }, + "version": 2, + } + imds_data = copy.deepcopy(NETWORK_METADATA) + imds_data["network"]["interface"].append(SECONDARY_INTERFACE_NO_IP) + self.m_get_metadata_from_imds.return_value = imds_data + dsrc = self._get_ds(data) + dsrc.get_data() + self.assertEqual(expected_network_config, dsrc.network_config) + + def test_availability_zone_set_from_imds(self): + """Datasource.availability returns IMDS platformFaultDomain.""" + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } + dsrc = self._get_ds(data) + dsrc.get_data() + self.assertEqual("0", dsrc.availability_zone) + + def test_region_set_from_imds(self): + """Datasource.region returns IMDS region location.""" + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } + dsrc = self._get_ds(data) + dsrc.get_data() + self.assertEqual("eastus2", dsrc.region) + + def test_sys_cfg_set_never_destroy_ntfs(self): + sys_cfg = { + "datasource": { + "Azure": {"never_destroy_ntfs": "user-supplied-value"} + } + } + data = { + "ovfcontent": construct_valid_ovf_env(data={}), + "sys_cfg": sys_cfg, + } + + dsrc = self._get_ds(data) + ret = self._get_and_setup(dsrc) + self.assertTrue(ret) + self.assertEqual( + dsrc.ds_cfg.get(dsaz.DS_CFG_KEY_PRESERVE_NTFS), + "user-supplied-value", + ) + + def test_username_used(self): + odata = {"HostName": "myhost", "UserName": "myuser"} + data = {"ovfcontent": construct_valid_ovf_env(data=odata)} + + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEqual( + dsrc.cfg["system_info"]["default_user"]["name"], "myuser" + ) + + def test_password_given(self): + odata = { + "HostName": "myhost", + "UserName": "myuser", + "UserPassword": "mypass", + } + data = {"ovfcontent": construct_valid_ovf_env(data=odata)} + + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertIn("default_user", dsrc.cfg["system_info"]) + defuser = dsrc.cfg["system_info"]["default_user"] + + # default user should be updated username and should not be locked. + self.assertEqual(defuser["name"], odata["UserName"]) + self.assertFalse(defuser["lock_passwd"]) + # passwd is crypt formated string $id$salt$encrypted + # encrypting plaintext with salt value of everything up to final '$' + # should equal that after the '$' + pos = defuser["passwd"].rfind("$") + 1 + self.assertEqual( + defuser["passwd"], + crypt.crypt(odata["UserPassword"], defuser["passwd"][0:pos]), + ) + + # the same hashed value should also be present in cfg['password'] + self.assertEqual(defuser["passwd"], dsrc.cfg["password"]) + + def test_user_not_locked_if_password_redacted(self): + odata = { + "HostName": "myhost", + "UserName": "myuser", + "UserPassword": dsaz.DEF_PASSWD_REDACTION, + } + data = {"ovfcontent": construct_valid_ovf_env(data=odata)} + + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertIn("default_user", dsrc.cfg["system_info"]) + defuser = dsrc.cfg["system_info"]["default_user"] + + # default user should be updated username and should not be locked. + self.assertEqual(defuser["name"], odata["UserName"]) + self.assertIn("lock_passwd", defuser) + self.assertFalse(defuser["lock_passwd"]) + + def test_userdata_plain(self): + mydata = "FOOBAR" + odata = {"UserData": {"text": mydata, "encoding": "plain"}} + data = {"ovfcontent": construct_valid_ovf_env(data=odata)} + + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEqual(decode_binary(dsrc.userdata_raw), mydata) + + def test_userdata_found(self): + mydata = "FOOBAR" + odata = {"UserData": {"text": b64e(mydata), "encoding": "base64"}} + data = {"ovfcontent": construct_valid_ovf_env(data=odata)} + + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEqual(dsrc.userdata_raw, mydata.encode("utf-8")) + + def test_default_ephemeral_configs_ephemeral_exists(self): + # make sure the ephemeral configs are correct if disk present + odata = {} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": {}, + } + + orig_exists = dsaz.os.path.exists + + def changed_exists(path): + return ( + True if path == dsaz.RESOURCE_DISK_PATH else orig_exists(path) + ) + + with mock.patch(MOCKPATH + "os.path.exists", new=changed_exists): + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + cfg = dsrc.get_config_obj() + + self.assertEqual( + dsrc.device_name_to_device("ephemeral0"), + dsaz.RESOURCE_DISK_PATH, + ) + assert "disk_setup" in cfg + assert "fs_setup" in cfg + self.assertIsInstance(cfg["disk_setup"], dict) + self.assertIsInstance(cfg["fs_setup"], list) + + def test_default_ephemeral_configs_ephemeral_does_not_exist(self): + # make sure the ephemeral configs are correct if disk not present + odata = {} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": {}, + } + + orig_exists = dsaz.os.path.exists + + def changed_exists(path): + return ( + False if path == dsaz.RESOURCE_DISK_PATH else orig_exists(path) + ) + + with mock.patch(MOCKPATH + "os.path.exists", new=changed_exists): + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + cfg = dsrc.get_config_obj() + + assert "disk_setup" not in cfg + assert "fs_setup" not in cfg + + def test_provide_disk_aliases(self): + # Make sure that user can affect disk aliases + dscfg = {"disk_aliases": {"ephemeral0": "/dev/sdc"}} + odata = { + "HostName": "myhost", + "UserName": "myuser", + "dscfg": {"text": b64e(yaml.dump(dscfg)), "encoding": "base64"}, + } + usercfg = { + "disk_setup": { + "/dev/sdc": {"something": "..."}, + "ephemeral0": False, + } + } + userdata = "#cloud-config" + yaml.dump(usercfg) + "\n" + + ovfcontent = construct_valid_ovf_env(data=odata, userdata=userdata) + data = {"ovfcontent": ovfcontent, "sys_cfg": {}} + + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + cfg = dsrc.get_config_obj() + self.assertTrue(cfg) + + def test_userdata_arrives(self): + userdata = "This is my user-data" + xml = construct_valid_ovf_env(data={}, userdata=userdata) + data = {"ovfcontent": xml} + dsrc = self._get_ds(data) + dsrc.get_data() + + self.assertEqual(userdata.encode("us-ascii"), dsrc.userdata_raw) + + def test_password_redacted_in_ovf(self): + odata = { + "HostName": "myhost", + "UserName": "myuser", + "UserPassword": "mypass", + } + data = {"ovfcontent": construct_valid_ovf_env(data=odata)} + dsrc = self._get_ds(data) + ret = dsrc.get_data() + + self.assertTrue(ret) + ovf_env_path = os.path.join(self.waagent_d, "ovf-env.xml") + + # The XML should not be same since the user password is redacted + on_disk_ovf = load_file(ovf_env_path) + self.xml_notequals(data["ovfcontent"], on_disk_ovf) + + # Make sure that the redacted password on disk is not used by CI + self.assertNotEqual( + dsrc.cfg.get("password"), dsaz.DEF_PASSWD_REDACTION + ) + + # Make sure that the password was really encrypted + et = ET.fromstring(on_disk_ovf) + for elem in et.iter(): + if "UserPassword" in elem.tag: + self.assertEqual(dsaz.DEF_PASSWD_REDACTION, elem.text) + + def test_ovf_env_arrives_in_waagent_dir(self): + xml = construct_valid_ovf_env(data={}, userdata="FOODATA") + dsrc = self._get_ds({"ovfcontent": xml}) + dsrc.get_data() + + # 'data_dir' is '/var/lib/waagent' (walinux-agent's state dir) + # we expect that the ovf-env.xml file is copied there. + ovf_env_path = os.path.join(self.waagent_d, "ovf-env.xml") + self.assertTrue(os.path.exists(ovf_env_path)) + self.xml_equals(xml, load_file(ovf_env_path)) + + def test_ovf_can_include_unicode(self): + xml = construct_valid_ovf_env(data={}) + xml = "\ufeff{0}".format(xml) + dsrc = self._get_ds({"ovfcontent": xml}) + dsrc.get_data() + + def test_dsaz_report_ready_returns_true_when_report_succeeds(self): + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) + assert dsrc._report_ready() == [] + + @mock.patch(MOCKPATH + "report_diagnostic_event") + def test_dsaz_report_ready_failure_reports_telemetry(self, m_report_diag): + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) + self.m_get_metadata_from_fabric.side_effect = Exception("foo") + + with pytest.raises(Exception): + dsrc._report_ready() + + assert m_report_diag.mock_calls == [ + mock.call( + "Error communicating with Azure fabric; " + "You may experience connectivity issues: foo", + logger_func=dsaz.LOG.warning, + ) + ] + + def test_dsaz_report_failure_returns_true_when_report_succeeds(self): + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) + + with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata: + # mock crawl metadata failure to cause report failure + m_crawl_metadata.side_effect = Exception + + self.assertTrue(dsrc._report_failure()) + self.assertEqual(1, self.m_report_failure_to_fabric.call_count) + + def test_dsaz_report_failure_returns_false_and_does_not_propagate_exc( + self, + ): + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) + + with mock.patch.object( + dsrc, "crawl_metadata" + ) as m_crawl_metadata, mock.patch.object( + dsrc, "_ephemeral_dhcp_ctx" + ) as m_ephemeral_dhcp_ctx, mock.patch.object( + dsrc.distro.networking, "is_up" + ) as m_dsrc_distro_networking_is_up: + # mock crawl metadata failure to cause report failure + m_crawl_metadata.side_effect = Exception + + # setup mocks to allow using cached ephemeral dhcp lease + m_dsrc_distro_networking_is_up.return_value = True + test_lease_dhcp_option_245 = "test_lease_dhcp_option_245" + test_lease = {"unknown-245": test_lease_dhcp_option_245} + m_ephemeral_dhcp_ctx.lease = test_lease + + # We expect 2 calls to report_failure_to_fabric, + # because we try 2 different methods of calling report failure. + # The different methods are attempted in the following order: + # 1. Using cached ephemeral dhcp context to report failure to Azure + # 2. Using new ephemeral dhcp to report failure to Azure + self.m_report_failure_to_fabric.side_effect = Exception + self.assertFalse(dsrc._report_failure()) + self.assertEqual(2, self.m_report_failure_to_fabric.call_count) + + def test_dsaz_report_failure_description_msg(self): + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) + + with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata: + # mock crawl metadata failure to cause report failure + m_crawl_metadata.side_effect = Exception + + test_msg = "Test report failure description message" + self.assertTrue(dsrc._report_failure(description=test_msg)) + self.m_report_failure_to_fabric.assert_called_once_with( + dhcp_opts=mock.ANY, description=test_msg + ) + + def test_dsaz_report_failure_no_description_msg(self): + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) + + with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata: + m_crawl_metadata.side_effect = Exception + + self.assertTrue(dsrc._report_failure()) # no description msg + self.m_report_failure_to_fabric.assert_called_once_with( + dhcp_opts=mock.ANY, description=None + ) + + def test_dsaz_report_failure_uses_cached_ephemeral_dhcp_ctx_lease(self): + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) + + with mock.patch.object( + dsrc, "crawl_metadata" + ) as m_crawl_metadata, mock.patch.object( + dsrc, "_wireserver_endpoint", return_value="test-ep" + ) as m_wireserver_endpoint: + # mock crawl metadata failure to cause report failure + m_crawl_metadata.side_effect = Exception + + self.assertTrue(dsrc._report_failure()) + + # ensure called with cached ephemeral dhcp lease option 245 + self.m_report_failure_to_fabric.assert_called_once_with( + description=mock.ANY, dhcp_opts=m_wireserver_endpoint + ) + + def test_dsaz_report_failure_no_net_uses_new_ephemeral_dhcp_lease(self): + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) + + with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata: + # mock crawl metadata failure to cause report failure + m_crawl_metadata.side_effect = Exception + + test_lease_dhcp_option_245 = "test_lease_dhcp_option_245" + test_lease = { + "unknown-245": test_lease_dhcp_option_245, + "interface": "eth0", + } + self.m_dhcp.return_value.obtain_lease.return_value = test_lease + + self.assertTrue(dsrc._report_failure()) + + # ensure called with the newly discovered + # ephemeral dhcp lease option 245 + self.m_report_failure_to_fabric.assert_called_once_with( + description=mock.ANY, dhcp_opts=test_lease_dhcp_option_245 + ) + + def test_exception_fetching_fabric_data_doesnt_propagate(self): + """Errors communicating with fabric should warn, but return True.""" + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) + self.m_get_metadata_from_fabric.side_effect = Exception + ret = self._get_and_setup(dsrc) + self.assertTrue(ret) + + def test_fabric_data_included_in_metadata(self): + dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) + self.m_get_metadata_from_fabric.return_value = ["ssh-key-value"] + ret = self._get_and_setup(dsrc) + self.assertTrue(ret) + self.assertEqual(["ssh-key-value"], dsrc.metadata["public-keys"]) + + def test_instance_id_case_insensitive(self): + """Return the previous iid when current is a case-insensitive match.""" + lower_iid = EXAMPLE_UUID.lower() + upper_iid = EXAMPLE_UUID.upper() + # lowercase current UUID + ds = self._get_ds( + {"ovfcontent": construct_valid_ovf_env()}, instance_id=lower_iid + ) + # UPPERCASE previous + write_file( + os.path.join(self.paths.cloud_dir, "data", "instance-id"), + upper_iid, + ) + ds.get_data() + self.assertEqual(upper_iid, ds.metadata["instance-id"]) + + # UPPERCASE current UUID + ds = self._get_ds( + {"ovfcontent": construct_valid_ovf_env()}, instance_id=upper_iid + ) + # lowercase previous + write_file( + os.path.join(self.paths.cloud_dir, "data", "instance-id"), + lower_iid, + ) + ds.get_data() + self.assertEqual(lower_iid, ds.metadata["instance-id"]) + + def test_instance_id_endianness(self): + """Return the previous iid when dmi uuid is the byteswapped iid.""" + ds = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) + # byte-swapped previous + write_file( + os.path.join(self.paths.cloud_dir, "data", "instance-id"), + "544CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8", + ) + ds.get_data() + self.assertEqual( + "544CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8", ds.metadata["instance-id"] + ) + # not byte-swapped previous + write_file( + os.path.join(self.paths.cloud_dir, "data", "instance-id"), + "644CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8", + ) + ds.get_data() + self.assertEqual(self.instance_id, ds.metadata["instance-id"]) + + def test_instance_id_from_dmidecode_used(self): + ds = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) + ds.get_data() + self.assertEqual(self.instance_id, ds.metadata["instance-id"]) + + def test_instance_id_from_dmidecode_used_for_builtin(self): + ds = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) + ds.get_data() + self.assertEqual(self.instance_id, ds.metadata["instance-id"]) + + @mock.patch(MOCKPATH + "util.is_FreeBSD") + @mock.patch(MOCKPATH + "_check_freebsd_cdrom") + def test_list_possible_azure_ds(self, m_check_fbsd_cdrom, m_is_FreeBSD): + """On FreeBSD, possible devs should show /dev/cd0.""" + m_is_FreeBSD.return_value = True + m_check_fbsd_cdrom.return_value = True + possible_ds = [] + for src in dsaz.list_possible_azure_ds("seed_dir", "cache_dir"): + possible_ds.append(src) + self.assertEqual( + possible_ds, + [ + "seed_dir", + dsaz.DEFAULT_PROVISIONING_ISO_DEV, + "/dev/cd0", + "cache_dir", + ], + ) + self.assertEqual( + [mock.call("/dev/cd0")], m_check_fbsd_cdrom.call_args_list + ) + + @mock.patch( + "cloudinit.sources.DataSourceAzure.device_driver", return_value=None + ) + @mock.patch("cloudinit.net.generate_fallback_config") + def test_imds_network_config(self, mock_fallback, m_driver): + """Network config is generated from IMDS network data when present.""" + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } + + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + + expected_cfg = { + "ethernets": { + "eth0": { + "dhcp4": True, + "dhcp4-overrides": {"route-metric": 100}, + "dhcp6": False, + "match": {"macaddress": "00:0d:3a:04:75:98"}, + "set-name": "eth0", + } + }, + "version": 2, + } + + self.assertEqual(expected_cfg, dsrc.network_config) + mock_fallback.assert_not_called() + + @mock.patch("cloudinit.net.get_interface_mac") + @mock.patch("cloudinit.net.get_devicelist") + @mock.patch("cloudinit.net.device_driver") + @mock.patch("cloudinit.net.generate_fallback_config") + def test_imds_network_ignored_when_apply_network_config_false( + self, mock_fallback, mock_dd, mock_devlist, mock_get_mac + ): + """When apply_network_config is False, use fallback instead of IMDS.""" + sys_cfg = {"datasource": {"Azure": {"apply_network_config": False}}} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } + fallback_config = { + "version": 1, + "config": [ + { + "type": "physical", + "name": "eth0", + "mac_address": "00:11:22:33:44:55", + "params": {"driver": "hv_netsvc"}, + "subnets": [{"type": "dhcp"}], + } + ], + } + mock_fallback.return_value = fallback_config + + mock_devlist.return_value = ["eth0"] + mock_dd.return_value = ["hv_netsvc"] + mock_get_mac.return_value = "00:11:22:33:44:55" + + dsrc = self._get_ds(data) + self.assertTrue(dsrc.get_data()) + self.assertEqual(dsrc.network_config, fallback_config) + + @mock.patch("cloudinit.net.get_interface_mac") + @mock.patch("cloudinit.net.get_devicelist") + @mock.patch("cloudinit.net.device_driver") + @mock.patch("cloudinit.net.generate_fallback_config", autospec=True) + def test_fallback_network_config( + self, mock_fallback, mock_dd, mock_devlist, mock_get_mac + ): + """On absent IMDS network data, generate network fallback config.""" + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": {}, + } + + fallback_config = { + "version": 1, + "config": [ + { + "type": "physical", + "name": "eth0", + "mac_address": "00:11:22:33:44:55", + "params": {"driver": "hv_netsvc"}, + "subnets": [{"type": "dhcp"}], + } + ], + } + mock_fallback.return_value = fallback_config + + mock_devlist.return_value = ["eth0"] + mock_dd.return_value = ["hv_netsvc"] + mock_get_mac.return_value = "00:11:22:33:44:55" + + dsrc = self._get_ds(data) + # Represent empty response from network imds + self.m_get_metadata_from_imds.return_value = {} + ret = dsrc.get_data() + self.assertTrue(ret) + + netconfig = dsrc.network_config + self.assertEqual(netconfig, fallback_config) + mock_fallback.assert_called_with( + blacklist_drivers=["mlx4_core", "mlx5_core"], config_driver=True + ) + + @mock.patch(MOCKPATH + "net.get_interfaces", autospec=True) + def test_blacklist_through_distro(self, m_net_get_interfaces): + """Verify Azure DS updates blacklist drivers in the distro's + networking object.""" + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": {}, + } + + distro_cls = distros.fetch("ubuntu") + distro = distro_cls("ubuntu", {}, self.paths) + dsrc = self._get_ds(data, distro=distro) + dsrc.get_data() + self.assertEqual( + distro.networking.blacklist_drivers, dsaz.BLACKLIST_DRIVERS + ) + + distro.networking.get_interfaces_by_mac() + self.m_get_interfaces.assert_called_with( + blacklist_drivers=dsaz.BLACKLIST_DRIVERS + ) + + @mock.patch( + "cloudinit.sources.helpers.azure.OpenSSLManager.parse_certificates" + ) + def test_get_public_ssh_keys_with_imds(self, m_parse_certificates): + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } + dsrc = self._get_ds(data) + dsrc.get_data() + dsrc.setup(True) + ssh_keys = dsrc.get_public_ssh_keys() + self.assertEqual(ssh_keys, ["ssh-rsa key1"]) + self.assertEqual(m_parse_certificates.call_count, 0) + + def test_key_without_crlf_valid(self): + test_key = "ssh-rsa somerandomkeystuff some comment" + assert True is dsaz._key_is_openssh_formatted(test_key) + + def test_key_with_crlf_invalid(self): + test_key = "ssh-rsa someran\r\ndomkeystuff some comment" + assert False is dsaz._key_is_openssh_formatted(test_key) + + def test_key_endswith_crlf_valid(self): + test_key = "ssh-rsa somerandomkeystuff some comment\r\n" + assert True is dsaz._key_is_openssh_formatted(test_key) + + @mock.patch( + "cloudinit.sources.helpers.azure.OpenSSLManager.parse_certificates" + ) + @mock.patch(MOCKPATH + "get_metadata_from_imds") + def test_get_public_ssh_keys_with_no_openssh_format( + self, m_get_metadata_from_imds, m_parse_certificates + ): + imds_data = copy.deepcopy(NETWORK_METADATA) + imds_data["compute"]["publicKeys"][0]["keyData"] = "no-openssh-format" + m_get_metadata_from_imds.return_value = imds_data + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } + dsrc = self._get_ds(data) + dsrc.get_data() + dsrc.setup(True) + ssh_keys = dsrc.get_public_ssh_keys() + self.assertEqual(ssh_keys, []) + self.assertEqual(m_parse_certificates.call_count, 0) + + @mock.patch(MOCKPATH + "get_metadata_from_imds") + def test_get_public_ssh_keys_without_imds(self, m_get_metadata_from_imds): + m_get_metadata_from_imds.return_value = dict() + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } + dsrc = self._get_ds(data) + dsaz.get_metadata_from_fabric.return_value = ["key2"] + dsrc.get_data() + dsrc.setup(True) + ssh_keys = dsrc.get_public_ssh_keys() + self.assertEqual(ssh_keys, ["key2"]) + + @mock.patch(MOCKPATH + "get_metadata_from_imds") + def test_imds_api_version_wanted_nonexistent( + self, m_get_metadata_from_imds + ): + def get_metadata_from_imds_side_eff(*args, **kwargs): + if kwargs["api_version"] == dsaz.IMDS_VER_WANT: + raise url_helper.UrlError("No IMDS version", code=400) + return NETWORK_METADATA + + m_get_metadata_from_imds.side_effect = get_metadata_from_imds_side_eff + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } + dsrc = self._get_ds(data) + dsrc.get_data() + self.assertIsNotNone(dsrc.metadata) + + assert m_get_metadata_from_imds.mock_calls == [ + mock.call( + retries=0, + md_type=dsaz.MetadataType.ALL, + api_version="2021-08-01", + exc_cb=mock.ANY, + ), + mock.call( + retries=10, + md_type=dsaz.MetadataType.ALL, + api_version="2019-06-01", + exc_cb=mock.ANY, + infinite=False, + ), + ] + + @mock.patch( + MOCKPATH + "get_metadata_from_imds", return_value=NETWORK_METADATA + ) + def test_imds_api_version_wanted_exists(self, m_get_metadata_from_imds): + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } + dsrc = self._get_ds(data) + dsrc.get_data() + self.assertIsNotNone(dsrc.metadata) + + assert m_get_metadata_from_imds.mock_calls == [ + mock.call( + retries=0, + md_type=dsaz.MetadataType.ALL, + api_version="2021-08-01", + exc_cb=mock.ANY, + ) + ] + + @mock.patch(MOCKPATH + "get_metadata_from_imds") + def test_hostname_from_imds(self, m_get_metadata_from_imds): + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } + imds_data_with_os_profile = copy.deepcopy(NETWORK_METADATA) + imds_data_with_os_profile["compute"]["osProfile"] = dict( + adminUsername="username1", + computerName="hostname1", + disablePasswordAuthentication="true", + ) + m_get_metadata_from_imds.return_value = imds_data_with_os_profile + dsrc = self._get_ds(data) + dsrc.get_data() + self.assertEqual(dsrc.metadata["local-hostname"], "hostname1") + + @mock.patch(MOCKPATH + "get_metadata_from_imds") + def test_username_from_imds(self, m_get_metadata_from_imds): + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } + imds_data_with_os_profile = copy.deepcopy(NETWORK_METADATA) + imds_data_with_os_profile["compute"]["osProfile"] = dict( + adminUsername="username1", + computerName="hostname1", + disablePasswordAuthentication="true", + ) + m_get_metadata_from_imds.return_value = imds_data_with_os_profile + dsrc = self._get_ds(data) + dsrc.get_data() + self.assertEqual( + dsrc.cfg["system_info"]["default_user"]["name"], "username1" + ) + + @mock.patch(MOCKPATH + "get_metadata_from_imds") + def test_disable_password_from_imds(self, m_get_metadata_from_imds): + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } + imds_data_with_os_profile = copy.deepcopy(NETWORK_METADATA) + imds_data_with_os_profile["compute"]["osProfile"] = dict( + adminUsername="username1", + computerName="hostname1", + disablePasswordAuthentication="true", + ) + m_get_metadata_from_imds.return_value = imds_data_with_os_profile + dsrc = self._get_ds(data) + dsrc.get_data() + self.assertTrue(dsrc.metadata["disable_password"]) + + @mock.patch(MOCKPATH + "get_metadata_from_imds") + def test_userdata_from_imds(self, m_get_metadata_from_imds): + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + odata = {"HostName": "myhost", "UserName": "myuser"} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } + userdata = "userdataImds" + imds_data = copy.deepcopy(NETWORK_METADATA) + imds_data["compute"]["osProfile"] = dict( + adminUsername="username1", + computerName="hostname1", + disablePasswordAuthentication="true", + ) + imds_data["compute"]["userData"] = b64e(userdata) + m_get_metadata_from_imds.return_value = imds_data + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEqual(dsrc.userdata_raw, userdata.encode("utf-8")) + + @mock.patch(MOCKPATH + "get_metadata_from_imds") + def test_userdata_from_imds_with_customdata_from_OVF( + self, m_get_metadata_from_imds + ): + userdataOVF = "userdataOVF" + odata = { + "HostName": "myhost", + "UserName": "myuser", + "UserData": {"text": b64e(userdataOVF), "encoding": "base64"}, + } + sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} + data = { + "ovfcontent": construct_valid_ovf_env(data=odata), + "sys_cfg": sys_cfg, + } + + userdataImds = "userdataImds" + imds_data = copy.deepcopy(NETWORK_METADATA) + imds_data["compute"]["osProfile"] = dict( + adminUsername="username1", + computerName="hostname1", + disablePasswordAuthentication="true", + ) + imds_data["compute"]["userData"] = b64e(userdataImds) + m_get_metadata_from_imds.return_value = imds_data + dsrc = self._get_ds(data) + ret = dsrc.get_data() + self.assertTrue(ret) + self.assertEqual(dsrc.userdata_raw, userdataOVF.encode("utf-8")) + + +class TestLoadAzureDsDir(CiTestCase): + """Tests for load_azure_ds_dir.""" + + def setUp(self): + self.source_dir = self.tmp_dir() + super(TestLoadAzureDsDir, self).setUp() + + def test_missing_ovf_env_xml_raises_non_azure_datasource_error(self): + """load_azure_ds_dir raises an error When ovf-env.xml doesn't exit.""" + with self.assertRaises(dsaz.NonAzureDataSource) as context_manager: + dsaz.load_azure_ds_dir(self.source_dir) + self.assertEqual( + "No ovf-env file found", str(context_manager.exception) + ) + + def test_wb_invalid_ovf_env_xml_calls_read_azure_ovf(self): + """load_azure_ds_dir calls read_azure_ovf to parse the xml.""" + ovf_path = os.path.join(self.source_dir, "ovf-env.xml") + with open(ovf_path, "wb") as stream: + stream.write(b"invalid xml") + with self.assertRaises(dsaz.BrokenAzureDataSource) as context_manager: + dsaz.load_azure_ds_dir(self.source_dir) + self.assertEqual( + "Invalid ovf-env.xml: syntax error: line 1, column 0", + str(context_manager.exception), + ) + + +class TestReadAzureOvf(CiTestCase): + def test_invalid_xml_raises_non_azure_ds(self): + invalid_xml = "<foo>" + construct_valid_ovf_env(data={}) + self.assertRaises( + dsaz.BrokenAzureDataSource, dsaz.read_azure_ovf, invalid_xml + ) + + def test_load_with_pubkeys(self): + mypklist = [{"fingerprint": "fp1", "path": "path1", "value": ""}] + pubkeys = [(x["fingerprint"], x["path"], x["value"]) for x in mypklist] + content = construct_valid_ovf_env(pubkeys=pubkeys) + (_md, _ud, cfg) = dsaz.read_azure_ovf(content) + for mypk in mypklist: + self.assertIn(mypk, cfg["_pubkeys"]) + + +class TestCanDevBeReformatted(CiTestCase): + warning_file = "dataloss_warning_readme.txt" + + def _domock(self, mockpath, sattr=None): + patcher = mock.patch(mockpath) + setattr(self, sattr, patcher.start()) + self.addCleanup(patcher.stop) + + def patchup(self, devs): + bypath = {} + for path, data in devs.items(): + bypath[path] = data + if "realpath" in data: + bypath[data["realpath"]] = data + for ppath, pdata in data.get("partitions", {}).items(): + bypath[ppath] = pdata + if "realpath" in data: + bypath[pdata["realpath"]] = pdata + + def realpath(d): + return bypath[d].get("realpath", d) + + def partitions_on_device(devpath): + parts = bypath.get(devpath, {}).get("partitions", {}) + ret = [] + for path, data in parts.items(): + ret.append((data.get("num"), realpath(path))) + # return sorted by partition number + return sorted(ret, key=lambda d: d[0]) + + def mount_cb(device, callback, mtype, update_env_for_mount): + self.assertEqual("ntfs", mtype) + self.assertEqual("C", update_env_for_mount.get("LANG")) + p = self.tmp_dir() + for f in bypath.get(device).get("files", []): + write_file(os.path.join(p, f), content=f) + return callback(p) + + def has_ntfs_fs(device): + return bypath.get(device, {}).get("fs") == "ntfs" + + p = MOCKPATH + self._domock(p + "_partitions_on_device", "m_partitions_on_device") + self._domock(p + "_has_ntfs_filesystem", "m_has_ntfs_filesystem") + self._domock(p + "util.mount_cb", "m_mount_cb") + self._domock(p + "os.path.realpath", "m_realpath") + self._domock(p + "os.path.exists", "m_exists") + self._domock(p + "util.SeLinuxGuard", "m_selguard") + + self.m_exists.side_effect = lambda p: p in bypath + self.m_realpath.side_effect = realpath + self.m_has_ntfs_filesystem.side_effect = has_ntfs_fs + self.m_mount_cb.side_effect = mount_cb + self.m_partitions_on_device.side_effect = partitions_on_device + self.m_selguard.__enter__ = mock.Mock(return_value=False) + self.m_selguard.__exit__ = mock.Mock() + + def test_three_partitions_is_false(self): + """A disk with 3 partitions can not be formatted.""" + self.patchup( + { + "/dev/sda": { + "partitions": { + "/dev/sda1": {"num": 1}, + "/dev/sda2": {"num": 2}, + "/dev/sda3": {"num": 3}, + } + } + } + ) + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=False + ) + self.assertFalse(value) + self.assertIn("3 or more", msg.lower()) + + def test_no_partitions_is_false(self): + """A disk with no partitions can not be formatted.""" + self.patchup({"/dev/sda": {}}) + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=False + ) + self.assertFalse(value) + self.assertIn("not partitioned", msg.lower()) + + def test_two_partitions_not_ntfs_false(self): + """2 partitions and 2nd not ntfs can not be formatted.""" + self.patchup( + { + "/dev/sda": { + "partitions": { + "/dev/sda1": {"num": 1}, + "/dev/sda2": {"num": 2, "fs": "ext4", "files": []}, + } + } + } + ) + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=False + ) + self.assertFalse(value) + self.assertIn("not ntfs", msg.lower()) + + def test_two_partitions_ntfs_populated_false(self): + """2 partitions and populated ntfs fs on 2nd can not be formatted.""" + self.patchup( + { + "/dev/sda": { + "partitions": { + "/dev/sda1": {"num": 1}, + "/dev/sda2": { + "num": 2, + "fs": "ntfs", + "files": ["secret.txt"], + }, + } + } + } + ) + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=False + ) + self.assertFalse(value) + self.assertIn("files on it", msg.lower()) + + def test_two_partitions_ntfs_empty_is_true(self): + """2 partitions and empty ntfs fs on 2nd can be formatted.""" + self.patchup( + { + "/dev/sda": { + "partitions": { + "/dev/sda1": {"num": 1}, + "/dev/sda2": {"num": 2, "fs": "ntfs", "files": []}, + } + } + } + ) + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=False + ) + self.assertTrue(value) + self.assertIn("safe for", msg.lower()) + + def test_one_partition_not_ntfs_false(self): + """1 partition witih fs other than ntfs can not be formatted.""" + self.patchup( + { + "/dev/sda": { + "partitions": { + "/dev/sda1": {"num": 1, "fs": "zfs"}, + } + } + } + ) + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=False + ) + self.assertFalse(value) + self.assertIn("not ntfs", msg.lower()) + + def test_one_partition_ntfs_populated_false(self): + """1 mountable ntfs partition with many files can not be formatted.""" + self.patchup( + { + "/dev/sda": { + "partitions": { + "/dev/sda1": { + "num": 1, + "fs": "ntfs", + "files": ["file1.txt", "file2.exe"], + }, + } + } + } + ) + with mock.patch.object(dsaz.LOG, "warning") as warning: + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=False + ) + wmsg = warning.call_args[0][0] + self.assertIn( + "looks like you're using NTFS on the ephemeral disk", wmsg + ) + self.assertFalse(value) + self.assertIn("files on it", msg.lower()) + + def test_one_partition_ntfs_empty_is_true(self): + """1 mountable ntfs partition and no files can be formatted.""" + self.patchup( + { + "/dev/sda": { + "partitions": { + "/dev/sda1": {"num": 1, "fs": "ntfs", "files": []} + } + } + } + ) + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=False + ) + self.assertTrue(value) + self.assertIn("safe for", msg.lower()) + + def test_one_partition_ntfs_empty_with_dataloss_file_is_true(self): + """1 mountable ntfs partition and only warn file can be formatted.""" + self.patchup( + { + "/dev/sda": { + "partitions": { + "/dev/sda1": { + "num": 1, + "fs": "ntfs", + "files": ["dataloss_warning_readme.txt"], + } + } + } + } + ) + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=False + ) + self.assertTrue(value) + self.assertIn("safe for", msg.lower()) + + def test_one_partition_through_realpath_is_true(self): + """A symlink to a device with 1 ntfs partition can be formatted.""" + epath = "/dev/disk/cloud/azure_resource" + self.patchup( + { + epath: { + "realpath": "/dev/sdb", + "partitions": { + epath + + "-part1": { + "num": 1, + "fs": "ntfs", + "files": [self.warning_file], + "realpath": "/dev/sdb1", + } + }, + } + } + ) + value, msg = dsaz.can_dev_be_reformatted(epath, preserve_ntfs=False) + self.assertTrue(value) + self.assertIn("safe for", msg.lower()) + + def test_three_partition_through_realpath_is_false(self): + """A symlink to a device with 3 partitions can not be formatted.""" + epath = "/dev/disk/cloud/azure_resource" + self.patchup( + { + epath: { + "realpath": "/dev/sdb", + "partitions": { + epath + + "-part1": { + "num": 1, + "fs": "ntfs", + "files": [self.warning_file], + "realpath": "/dev/sdb1", + }, + epath + + "-part2": { + "num": 2, + "fs": "ext3", + "realpath": "/dev/sdb2", + }, + epath + + "-part3": { + "num": 3, + "fs": "ext", + "realpath": "/dev/sdb3", + }, + }, + } + } + ) + value, msg = dsaz.can_dev_be_reformatted(epath, preserve_ntfs=False) + self.assertFalse(value) + self.assertIn("3 or more", msg.lower()) + + def test_ntfs_mount_errors_true(self): + """can_dev_be_reformatted does not fail if NTFS is unknown fstype.""" + self.patchup( + { + "/dev/sda": { + "partitions": { + "/dev/sda1": {"num": 1, "fs": "ntfs", "files": []} + } + } + } + ) + + error_msgs = [ + "Stderr: mount: unknown filesystem type 'ntfs'", # RHEL + "Stderr: mount: /dev/sdb1: unknown filesystem type 'ntfs'", # SLES + ] + + for err_msg in error_msgs: + self.m_mount_cb.side_effect = MountFailedError( + "Failed mounting %s to %s due to: \nUnexpected.\n%s" + % ("/dev/sda", "/fake-tmp/dir", err_msg) + ) + + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=False + ) + self.assertTrue(value) + self.assertIn("cannot mount NTFS, assuming", msg) + + def test_never_destroy_ntfs_config_false(self): + """Normally formattable situation with never_destroy_ntfs set.""" + self.patchup( + { + "/dev/sda": { + "partitions": { + "/dev/sda1": { + "num": 1, + "fs": "ntfs", + "files": ["dataloss_warning_readme.txt"], + } + } + } + } + ) + value, msg = dsaz.can_dev_be_reformatted( + "/dev/sda", preserve_ntfs=True + ) + self.assertFalse(value) + self.assertIn( + "config says to never destroy NTFS " + "(datasource.Azure.never_destroy_ntfs)", + msg, + ) + + +class TestClearCachedData(CiTestCase): + def test_clear_cached_attrs_clears_imds(self): + """All class attributes are reset to defaults, including imds data.""" + tmp = self.tmp_dir() + paths = helpers.Paths({"cloud_dir": tmp, "run_dir": tmp}) + dsrc = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=paths) + clean_values = [dsrc.metadata, dsrc.userdata, dsrc._metadata_imds] + dsrc.metadata = "md" + dsrc.userdata = "ud" + dsrc._metadata_imds = "imds" + dsrc._dirty_cache = True + dsrc.clear_cached_attrs() + self.assertEqual( + [dsrc.metadata, dsrc.userdata, dsrc._metadata_imds], clean_values + ) + + +class TestAzureNetExists(CiTestCase): + def test_azure_net_must_exist_for_legacy_objpkl(self): + """DataSourceAzureNet must exist for old obj.pkl files + that reference it.""" + self.assertTrue(hasattr(dsaz, "DataSourceAzureNet")) + + +class TestPreprovisioningReadAzureOvfFlag(CiTestCase): + def test_read_azure_ovf_with_true_flag(self): + """The read_azure_ovf method should set the PreprovisionedVM + cfg flag if the proper setting is present.""" + content = construct_valid_ovf_env( + platform_settings={"PreprovisionedVm": "True"} + ) + ret = dsaz.read_azure_ovf(content) + cfg = ret[2] + self.assertTrue(cfg["PreprovisionedVm"]) + + def test_read_azure_ovf_with_false_flag(self): + """The read_azure_ovf method should set the PreprovisionedVM + cfg flag to false if the proper setting is false.""" + content = construct_valid_ovf_env( + platform_settings={"PreprovisionedVm": "False"} + ) + ret = dsaz.read_azure_ovf(content) + cfg = ret[2] + self.assertFalse(cfg["PreprovisionedVm"]) + + def test_read_azure_ovf_without_flag(self): + """The read_azure_ovf method should not set the + PreprovisionedVM cfg flag.""" + content = construct_valid_ovf_env() + ret = dsaz.read_azure_ovf(content) + cfg = ret[2] + self.assertFalse(cfg["PreprovisionedVm"]) + self.assertEqual(None, cfg["PreprovisionedVMType"]) + + def test_read_azure_ovf_with_running_type(self): + """The read_azure_ovf method should set PreprovisionedVMType + cfg flag to Running.""" + content = construct_valid_ovf_env( + platform_settings={ + "PreprovisionedVMType": "Running", + "PreprovisionedVm": "True", + } + ) + ret = dsaz.read_azure_ovf(content) + cfg = ret[2] + self.assertTrue(cfg["PreprovisionedVm"]) + self.assertEqual("Running", cfg["PreprovisionedVMType"]) + + def test_read_azure_ovf_with_savable_type(self): + """The read_azure_ovf method should set PreprovisionedVMType + cfg flag to Savable.""" + content = construct_valid_ovf_env( + platform_settings={ + "PreprovisionedVMType": "Savable", + "PreprovisionedVm": "True", + } + ) + ret = dsaz.read_azure_ovf(content) + cfg = ret[2] + self.assertTrue(cfg["PreprovisionedVm"]) + self.assertEqual("Savable", cfg["PreprovisionedVMType"]) + + +@pytest.mark.parametrize( + "ovf_cfg,imds_md,pps_type", + [ + ( + {"PreprovisionedVm": False, "PreprovisionedVMType": None}, + {}, + dsaz.PPSType.NONE, + ), + ( + {"PreprovisionedVm": True, "PreprovisionedVMType": "Running"}, + {}, + dsaz.PPSType.RUNNING, + ), + ( + {"PreprovisionedVm": True, "PreprovisionedVMType": "Savable"}, + {}, + dsaz.PPSType.SAVABLE, + ), + ( + {"PreprovisionedVm": True}, + {}, + dsaz.PPSType.RUNNING, + ), + ( + {}, + {"extended": {"compute": {"ppsType": "None"}}}, + dsaz.PPSType.NONE, + ), + ( + {}, + {"extended": {"compute": {"ppsType": "Running"}}}, + dsaz.PPSType.RUNNING, + ), + ( + {}, + {"extended": {"compute": {"ppsType": "Savable"}}}, + dsaz.PPSType.SAVABLE, + ), + ( + {"PreprovisionedVm": False, "PreprovisionedVMType": None}, + {"extended": {"compute": {"ppsType": "None"}}}, + dsaz.PPSType.NONE, + ), + ( + {"PreprovisionedVm": True, "PreprovisionedVMType": "Running"}, + {"extended": {"compute": {"ppsType": "Running"}}}, + dsaz.PPSType.RUNNING, + ), + ( + {"PreprovisionedVm": True, "PreprovisionedVMType": "Savable"}, + {"extended": {"compute": {"ppsType": "Savable"}}}, + dsaz.PPSType.SAVABLE, + ), + ( + {"PreprovisionedVm": True}, + {"extended": {"compute": {"ppsType": "Running"}}}, + dsaz.PPSType.RUNNING, + ), + ], +) +class TestDeterminePPSTypeScenarios: + @mock.patch("os.path.isfile", return_value=False) + def test_determine_pps_without_reprovision_marker( + self, is_file, azure_ds, ovf_cfg, imds_md, pps_type + ): + assert azure_ds._determine_pps_type(ovf_cfg, imds_md) == pps_type + + @mock.patch("os.path.isfile", return_value=True) + def test_determine_pps_with_reprovision_marker( + self, is_file, azure_ds, ovf_cfg, imds_md, pps_type + ): + assert ( + azure_ds._determine_pps_type(ovf_cfg, imds_md) + == dsaz.PPSType.UNKNOWN + ) + assert is_file.mock_calls == [mock.call(dsaz.REPROVISION_MARKER_FILE)] + + +@mock.patch("os.path.isfile", return_value=False) +class TestReprovision(CiTestCase): + def setUp(self): + super(TestReprovision, self).setUp() + tmp = self.tmp_dir() + self.waagent_d = self.tmp_path("/var/lib/waagent", tmp) + self.paths = helpers.Paths({"cloud_dir": tmp}) + dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d + + @mock.patch(MOCKPATH + "DataSourceAzure._poll_imds") + def test_reprovision_calls__poll_imds(self, _poll_imds, isfile): + """_reprovision will poll IMDS.""" + isfile.return_value = False + hostname = "myhost" + username = "myuser" + odata = {"HostName": hostname, "UserName": username} + _poll_imds.return_value = construct_valid_ovf_env(data=odata) + dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) + dsa._reprovision() + _poll_imds.assert_called_with() + + +class TestPreprovisioningHotAttachNics(CiTestCase): + def setUp(self): + super(TestPreprovisioningHotAttachNics, self).setUp() + self.tmp = self.tmp_dir() + self.waagent_d = self.tmp_path("/var/lib/waagent", self.tmp) + self.paths = helpers.Paths({"cloud_dir": self.tmp}) + dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d + self.paths = helpers.Paths({"cloud_dir": self.tmp}) + + @mock.patch( + "cloudinit.sources.helpers.netlink.wait_for_nic_detach_event", + autospec=True, + ) + @mock.patch(MOCKPATH + "util.write_file", autospec=True) + def test_nic_detach_writes_marker(self, m_writefile, m_detach): + """When we detect that a nic gets detached, we write a marker for it""" + dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) + nl_sock = mock.MagicMock() + dsa._wait_for_nic_detach(nl_sock) + m_detach.assert_called_with(nl_sock) + self.assertEqual(1, m_detach.call_count) + m_writefile.assert_called_with( + dsaz.REPROVISION_NIC_DETACHED_MARKER_FILE, mock.ANY + ) + + @mock.patch(MOCKPATH + "util.write_file", autospec=True) + @mock.patch(MOCKPATH + "DataSourceAzure.fallback_interface") + @mock.patch(MOCKPATH + "DataSourceAzure._report_ready") + @mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach") + def test_detect_nic_attach_reports_ready_and_waits_for_detach( + self, m_detach, m_report_ready, m_fallback_if, m_writefile + ): + """Report ready first and then wait for nic detach""" + dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) + dsa._wait_for_all_nics_ready() + m_fallback_if.return_value = "Dummy interface" + self.assertEqual(1, m_report_ready.call_count) + self.assertEqual(1, m_detach.call_count) + self.assertEqual(1, m_writefile.call_count) + m_writefile.assert_called_with( + dsaz.REPORTED_READY_MARKER_FILE, mock.ANY + ) + + @mock.patch("os.path.isfile") + @mock.patch(MOCKPATH + "DataSourceAzure.fallback_interface") + @mock.patch(MOCKPATH + "EphemeralDHCPv4", autospec=True) + @mock.patch(MOCKPATH + "DataSourceAzure._report_ready") + @mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach") + def test_detect_nic_attach_skips_report_ready_when_marker_present( + self, m_detach, m_report_ready, m_dhcp, m_fallback_if, m_isfile + ): + """Skip reporting ready if we already have a marker file.""" + dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) + + def isfile(key): + return key == dsaz.REPORTED_READY_MARKER_FILE + + m_isfile.side_effect = isfile + dsa._wait_for_all_nics_ready() + m_fallback_if.return_value = "Dummy interface" + self.assertEqual(0, m_report_ready.call_count) + self.assertEqual(0, m_dhcp.call_count) + self.assertEqual(1, m_detach.call_count) + + @mock.patch("os.path.isfile") + @mock.patch(MOCKPATH + "DataSourceAzure.fallback_interface") + @mock.patch(MOCKPATH + "EphemeralDHCPv4") + @mock.patch(MOCKPATH + "DataSourceAzure._report_ready") + @mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach") + def test_detect_nic_attach_skips_nic_detach_when_marker_present( + self, m_detach, m_report_ready, m_dhcp, m_fallback_if, m_isfile + ): + """Skip wait for nic detach if it already happened.""" + dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) + + m_isfile.return_value = True + dsa._wait_for_all_nics_ready() + m_fallback_if.return_value = "Dummy interface" + self.assertEqual(0, m_report_ready.call_count) + self.assertEqual(0, m_dhcp.call_count) + self.assertEqual(0, m_detach.call_count) + + @mock.patch(MOCKPATH + "DataSourceAzure.wait_for_link_up", autospec=True) + @mock.patch("cloudinit.sources.helpers.netlink.wait_for_nic_attach_event") + @mock.patch("cloudinit.sources.net.find_fallback_nic") + @mock.patch(MOCKPATH + "get_metadata_from_imds") + @mock.patch(MOCKPATH + "EphemeralDHCPv4", autospec=True) + @mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach") + @mock.patch("os.path.isfile") + def test_wait_for_nic_attach_if_no_fallback_interface( + self, + m_isfile, + m_detach, + m_dhcpv4, + m_imds, + m_fallback_if, + m_attach, + m_link_up, + ): + """Wait for nic attach if we do not have a fallback interface""" + dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) + lease = { + "interface": "eth9", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + "unknown-245": "624c3620", + } + + m_isfile.return_value = True + m_attach.return_value = "eth0" + dhcp_ctx = mock.MagicMock(lease=lease) + dhcp_ctx.obtain_lease.return_value = lease + m_dhcpv4.return_value = dhcp_ctx + m_imds.return_value = IMDS_NETWORK_METADATA + m_fallback_if.return_value = None + + dsa._wait_for_all_nics_ready() + + self.assertEqual(0, m_detach.call_count) + self.assertEqual(1, m_attach.call_count) + self.assertEqual(1, m_dhcpv4.call_count) + self.assertEqual(1, m_imds.call_count) + self.assertEqual(1, m_link_up.call_count) + m_link_up.assert_called_with(mock.ANY, "eth0") + + @mock.patch(MOCKPATH + "DataSourceAzure.wait_for_link_up") + @mock.patch("cloudinit.sources.helpers.netlink.wait_for_nic_attach_event") + @mock.patch("cloudinit.sources.net.find_fallback_nic") + @mock.patch(MOCKPATH + "DataSourceAzure.get_imds_data_with_api_fallback") + @mock.patch(MOCKPATH + "EphemeralDHCPv4", autospec=True) + @mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach") + @mock.patch("os.path.isfile") + def test_wait_for_nic_attach_multinic_attach( + self, + m_isfile, + m_detach, + m_dhcpv4, + m_imds, + m_fallback_if, + m_attach, + m_link_up, + ): + """Wait for nic attach if we do not have a fallback interface""" + dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) + lease = { + "interface": "eth9", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + "unknown-245": "624c3620", + } + + # Simulate two NICs by adding the same one twice. + md = { + "interface": [ + IMDS_NETWORK_METADATA["interface"][0], + IMDS_NETWORK_METADATA["interface"][0], + ] + } + + m_isfile.return_value = True + m_attach.side_effect = [ + "eth0", + "eth1", + ] + dhcp_ctx = mock.MagicMock(lease=lease) + dhcp_ctx.obtain_lease.return_value = lease + m_dhcpv4.return_value = dhcp_ctx + m_imds.side_effect = [md] + m_fallback_if.return_value = None + + dsa._wait_for_all_nics_ready() + + self.assertEqual(0, m_detach.call_count) + self.assertEqual(2, m_attach.call_count) + # DHCP and network metadata calls will only happen on the primary NIC. + self.assertEqual(1, m_dhcpv4.call_count) + self.assertEqual(1, m_imds.call_count) + self.assertEqual(2, m_link_up.call_count) + + @mock.patch("cloudinit.url_helper.time.sleep", autospec=True) + @mock.patch("requests.Session.request", autospec=True) + @mock.patch(MOCKPATH + "EphemeralDHCPv4", autospec=True) + def test_check_if_nic_is_primary_retries_on_failures( + self, m_dhcpv4, m_request, m_sleep + ): + """Retry polling for network metadata on all failures except timeout + and network unreachable errors""" + dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) + lease = { + "interface": "eth9", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + "unknown-245": "624c3620", + } + + # Simulate two NICs by adding the same one twice. + md = { + "interface": [ + IMDS_NETWORK_METADATA["interface"][0], + IMDS_NETWORK_METADATA["interface"][0], + ] + } + + m_req = mock.Mock(content=json.dumps(md)) + m_request.side_effect = [ + requests.Timeout("Fake connection timeout"), + requests.ConnectionError("Fake Network Unreachable"), + m_req, + ] + m_dhcpv4.return_value.lease = lease + + is_primary, expected_nic_count = dsa._check_if_nic_is_primary("eth0") + self.assertEqual(True, is_primary) + self.assertEqual(2, expected_nic_count) + assert len(m_request.mock_calls) == 3 + + # Re-run tests to verify max retries. + m_request.reset_mock() + m_request.side_effect = [ + requests.Timeout("Fake connection timeout") + ] * 6 + [requests.ConnectionError("Fake Network Unreachable")] * 6 + + dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) + + is_primary, expected_nic_count = dsa._check_if_nic_is_primary("eth1") + self.assertEqual(False, is_primary) + assert len(m_request.mock_calls) == 11 + + @mock.patch("cloudinit.distros.networking.LinuxNetworking.try_set_link_up") + def test_wait_for_link_up_returns_if_already_up(self, m_is_link_up): + """Waiting for link to be up should return immediately if the link is + already up.""" + + distro_cls = distros.fetch("ubuntu") + distro = distro_cls("ubuntu", {}, self.paths) + dsa = dsaz.DataSourceAzure({}, distro=distro, paths=self.paths) + m_is_link_up.return_value = True + + dsa.wait_for_link_up("eth0") + self.assertEqual(1, m_is_link_up.call_count) + + @mock.patch(MOCKPATH + "net.is_up", autospec=True) + @mock.patch(MOCKPATH + "util.write_file") + @mock.patch("cloudinit.net.read_sys_net", return_value="device-id") + @mock.patch("cloudinit.distros.networking.LinuxNetworking.try_set_link_up") + def test_wait_for_link_up_checks_link_after_sleep( + self, m_try_set_link_up, m_read_sys_net, m_writefile, m_is_up + ): + """Waiting for link to be up should return immediately if the link is + already up.""" + + distro_cls = distros.fetch("ubuntu") + distro = distro_cls("ubuntu", {}, self.paths) + dsa = dsaz.DataSourceAzure({}, distro=distro, paths=self.paths) + m_try_set_link_up.return_value = False + + callcount = 0 + + def is_up_mock(key): + nonlocal callcount + if callcount == 0: + callcount += 1 + return False + return True + + m_is_up.side_effect = is_up_mock + + with mock.patch("cloudinit.sources.DataSourceAzure.sleep"): + dsa.wait_for_link_up("eth0") + self.assertEqual(2, m_try_set_link_up.call_count) + self.assertEqual(2, m_is_up.call_count) + + @mock.patch(MOCKPATH + "util.write_file") + @mock.patch("cloudinit.net.read_sys_net", return_value="device-id") + @mock.patch("cloudinit.distros.networking.LinuxNetworking.try_set_link_up") + def test_wait_for_link_up_writes_to_device_file( + self, m_is_link_up, m_read_sys_net, m_writefile + ): + """Waiting for link to be up should return immediately if the link is + already up.""" + + distro_cls = distros.fetch("ubuntu") + distro = distro_cls("ubuntu", {}, self.paths) + dsa = dsaz.DataSourceAzure({}, distro=distro, paths=self.paths) + + callcount = 0 + + def linkup(key): + nonlocal callcount + if callcount == 0: + callcount += 1 + return False + return True + + m_is_link_up.side_effect = linkup + + dsa.wait_for_link_up("eth0") + self.assertEqual(2, m_is_link_up.call_count) + self.assertEqual(1, m_read_sys_net.call_count) + self.assertEqual(2, m_writefile.call_count) + + @mock.patch( + "cloudinit.sources.helpers.netlink.create_bound_netlink_socket" + ) + def test_wait_for_all_nics_ready_raises_if_socket_fails(self, m_socket): + """Waiting for all nics should raise exception if netlink socket + creation fails.""" + + m_socket.side_effect = netlink.NetlinkCreateSocketError + distro_cls = distros.fetch("ubuntu") + distro = distro_cls("ubuntu", {}, self.paths) + dsa = dsaz.DataSourceAzure({}, distro=distro, paths=self.paths) + + self.assertRaises( + netlink.NetlinkCreateSocketError, dsa._wait_for_all_nics_ready + ) + # dsa._wait_for_all_nics_ready() + + +@mock.patch("cloudinit.net.find_fallback_nic", return_value="eth9") +@mock.patch("cloudinit.net.dhcp.EphemeralIPv4Network") +@mock.patch("cloudinit.net.dhcp.maybe_perform_dhcp_discovery") +@mock.patch( + "cloudinit.sources.helpers.netlink.wait_for_media_disconnect_connect" +) +@mock.patch("requests.Session.request") +@mock.patch(MOCKPATH + "DataSourceAzure._report_ready", return_value=True) +class TestPreprovisioningPollIMDS(CiTestCase): + def setUp(self): + super(TestPreprovisioningPollIMDS, self).setUp() + self.tmp = self.tmp_dir() + self.waagent_d = self.tmp_path("/var/lib/waagent", self.tmp) + self.paths = helpers.Paths({"cloud_dir": self.tmp}) + dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d + + @mock.patch("time.sleep", mock.MagicMock()) + def test_poll_imds_re_dhcp_on_timeout( + self, + m_report_ready, + m_request, + m_media_switch, + m_dhcp, + m_net, + m_fallback, + ): + """The poll_imds will retry DHCP on IMDS timeout.""" + report_file = self.tmp_path("report_marker", self.tmp) + lease = { + "interface": "eth9", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + "unknown-245": "624c3620", + } + m_dhcp.return_value = [lease] + m_media_switch.return_value = None + dhcp_ctx = mock.MagicMock(lease=lease) + dhcp_ctx.obtain_lease.return_value = lease + + self.tries = 0 + + def fake_timeout_once(**kwargs): + self.tries += 1 + if self.tries == 1: + raise requests.Timeout("Fake connection timeout") + elif self.tries in (2, 3): + response = requests.Response() + response.status_code = 404 if self.tries == 2 else 410 + raise requests.exceptions.HTTPError( + "fake {}".format(response.status_code), response=response + ) + # Third try should succeed and stop retries or redhcp + return mock.MagicMock(status_code=200, text="good", content="good") + + m_request.side_effect = fake_timeout_once + + dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) + with mock.patch(MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file): + dsa._poll_imds() + + assert m_report_ready.mock_calls == [mock.call()] + + self.assertEqual(3, m_dhcp.call_count, "Expected 3 DHCP calls") + self.assertEqual(4, self.tries, "Expected 4 total reads from IMDS") + + @mock.patch("os.path.isfile") + def test_poll_imds_skips_dhcp_if_ctx_present( + self, + m_isfile, + report_ready_func, + fake_resp, + m_media_switch, + m_dhcp, + m_net, + m_fallback, + ): + """The poll_imds function should reuse the dhcp ctx if it is already + present. This happens when we wait for nic to be hot-attached before + polling for reprovisiondata. Note that if this ctx is set when + _poll_imds is called, then it is not expected to be waiting for + media_disconnect_connect either.""" + report_file = self.tmp_path("report_marker", self.tmp) + m_isfile.return_value = True + dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) + dsa._ephemeral_dhcp_ctx = mock.Mock(lease={}) + with mock.patch(MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file): + dsa._poll_imds() + self.assertEqual(0, m_dhcp.call_count) + self.assertEqual(0, m_media_switch.call_count) + + @mock.patch("os.path.isfile") + @mock.patch(MOCKPATH + "EphemeralDHCPv4", autospec=True) + def test_poll_imds_does_dhcp_on_retries_if_ctx_present( + self, + m_ephemeral_dhcpv4, + m_isfile, + report_ready_func, + m_request, + m_media_switch, + m_dhcp, + m_net, + m_fallback, + ): + """The poll_imds function should reuse the dhcp ctx if it is already + present. This happens when we wait for nic to be hot-attached before + polling for reprovisiondata. Note that if this ctx is set when + _poll_imds is called, then it is not expected to be waiting for + media_disconnect_connect either.""" + + tries = 0 + + def fake_timeout_once(**kwargs): + nonlocal tries + tries += 1 + if tries == 1: + raise requests.Timeout("Fake connection timeout") + return mock.MagicMock(status_code=200, text="good", content="good") + + m_request.side_effect = fake_timeout_once + report_file = self.tmp_path("report_marker", self.tmp) + m_isfile.return_value = True + dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) + with mock.patch( + MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file + ), mock.patch.object(dsa, "_ephemeral_dhcp_ctx") as m_dhcp_ctx: + m_dhcp_ctx.obtain_lease.return_value = "Dummy lease" + dsa._ephemeral_dhcp_ctx = m_dhcp_ctx + dsa._poll_imds() + self.assertEqual(1, m_dhcp_ctx.clean_network.call_count) + self.assertEqual(1, m_ephemeral_dhcpv4.call_count) + self.assertEqual(0, m_media_switch.call_count) + self.assertEqual(2, m_request.call_count) + + def test_does_not_poll_imds_report_ready_when_marker_file_exists( + self, + m_report_ready, + m_request, + m_media_switch, + m_dhcp, + m_net, + m_fallback, + ): + """poll_imds should not call report ready when the reported ready + marker file exists""" + report_file = self.tmp_path("report_marker", self.tmp) + write_file(report_file, content="dont run report_ready :)") + m_dhcp.return_value = [ + { + "interface": "eth9", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + "unknown-245": "624c3620", + } + ] + m_media_switch.return_value = None + dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) + with mock.patch(MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file): + dsa._poll_imds() + self.assertEqual(m_report_ready.call_count, 0) + + def test_poll_imds_report_ready_success_writes_marker_file( + self, + m_report_ready, + m_request, + m_media_switch, + m_dhcp, + m_net, + m_fallback, + ): + """poll_imds should write the report_ready marker file if + reporting ready succeeds""" + report_file = self.tmp_path("report_marker", self.tmp) + m_dhcp.return_value = [ + { + "interface": "eth9", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + "unknown-245": "624c3620", + } + ] + m_media_switch.return_value = None + dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) + self.assertFalse(os.path.exists(report_file)) + with mock.patch(MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file): + dsa._poll_imds() + self.assertEqual(m_report_ready.call_count, 1) + self.assertTrue(os.path.exists(report_file)) + + def test_poll_imds_report_ready_failure_raises_exc_and_doesnt_write_marker( + self, + m_report_ready, + m_request, + m_media_switch, + m_dhcp, + m_net, + m_fallback, + ): + """poll_imds should write the report_ready marker file if + reporting ready succeeds""" + report_file = self.tmp_path("report_marker", self.tmp) + m_dhcp.return_value = [ + { + "interface": "eth9", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + "unknown-245": "624c3620", + } + ] + m_media_switch.return_value = None + m_report_ready.side_effect = [Exception("fail")] + dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) + self.assertFalse(os.path.exists(report_file)) + with mock.patch(MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file): + self.assertRaises(InvalidMetaDataException, dsa._poll_imds) + self.assertEqual(m_report_ready.call_count, 1) + self.assertFalse(os.path.exists(report_file)) + + +@mock.patch(MOCKPATH + "DataSourceAzure._report_ready", mock.MagicMock()) +@mock.patch(MOCKPATH + "subp.subp", mock.MagicMock()) +@mock.patch(MOCKPATH + "util.write_file", mock.MagicMock()) +@mock.patch( + "cloudinit.sources.helpers.netlink.wait_for_media_disconnect_connect" +) +@mock.patch("cloudinit.net.dhcp.EphemeralIPv4Network", autospec=True) +@mock.patch("cloudinit.net.dhcp.maybe_perform_dhcp_discovery") +@mock.patch("requests.Session.request") +class TestAzureDataSourcePreprovisioning(CiTestCase): + def setUp(self): + super(TestAzureDataSourcePreprovisioning, self).setUp() + tmp = self.tmp_dir() + self.waagent_d = self.tmp_path("/var/lib/waagent", tmp) + self.paths = helpers.Paths({"cloud_dir": tmp}) + dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d + + def test_poll_imds_returns_ovf_env( + self, m_request, m_dhcp, m_net, m_media_switch + ): + """The _poll_imds method should return the ovf_env.xml.""" + m_media_switch.return_value = None + m_dhcp.return_value = [ + { + "interface": "eth9", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + } + ] + url = "http://{0}/metadata/reprovisiondata?api-version=2019-06-01" + host = "169.254.169.254" + full_url = url.format(host) + m_request.return_value = mock.MagicMock( + status_code=200, text="ovf", content="ovf" + ) + dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) + self.assertTrue(len(dsa._poll_imds()) > 0) + self.assertEqual( + m_request.call_args_list, + [ + mock.call( + allow_redirects=True, + headers={ + "Metadata": "true", + "User-Agent": "Cloud-Init/%s" % vs(), + }, + method="GET", + timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS, + url=full_url, + ) + ], + ) + self.assertEqual(m_dhcp.call_count, 2) + m_net.assert_any_call( + broadcast="192.168.2.255", + interface="eth9", + ip="192.168.2.9", + prefix_or_mask="255.255.255.0", + router="192.168.2.1", + static_routes=None, + ) + self.assertEqual(m_net.call_count, 2) + + def test__reprovision_calls__poll_imds( + self, m_request, m_dhcp, m_net, m_media_switch + ): + """The _reprovision method should call poll IMDS.""" + m_media_switch.return_value = None + m_dhcp.return_value = [ + { + "interface": "eth9", + "fixed-address": "192.168.2.9", + "routers": "192.168.2.1", + "subnet-mask": "255.255.255.0", + "unknown-245": "624c3620", + } + ] + url = "http://{0}/metadata/reprovisiondata?api-version=2019-06-01" + host = "169.254.169.254" + full_url = url.format(host) + hostname = "myhost" + username = "myuser" + odata = {"HostName": hostname, "UserName": username} + content = construct_valid_ovf_env(data=odata) + m_request.return_value = mock.MagicMock( + status_code=200, text=content, content=content + ) + dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) + md, _ud, cfg, _d = dsa._reprovision() + self.assertEqual(md["local-hostname"], hostname) + self.assertEqual(cfg["system_info"]["default_user"]["name"], username) + self.assertIn( + mock.call( + allow_redirects=True, + headers={ + "Metadata": "true", + "User-Agent": "Cloud-Init/%s" % vs(), + }, + method="GET", + timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS, + url=full_url, + ), + m_request.call_args_list, + ) + self.assertEqual(m_dhcp.call_count, 2) + m_net.assert_any_call( + broadcast="192.168.2.255", + interface="eth9", + ip="192.168.2.9", + prefix_or_mask="255.255.255.0", + router="192.168.2.1", + static_routes=None, + ) + self.assertEqual(m_net.call_count, 2) + + +class TestRemoveUbuntuNetworkConfigScripts(CiTestCase): + + with_logs = True + + def setUp(self): + super(TestRemoveUbuntuNetworkConfigScripts, self).setUp() + self.tmp = self.tmp_dir() + + def test_remove_network_scripts_removes_both_files_and_directories(self): + """Any files or directories in paths are removed when present.""" + file1 = self.tmp_path("file1", dir=self.tmp) + subdir = self.tmp_path("sub1", dir=self.tmp) + subfile = self.tmp_path("leaf1", dir=subdir) + write_file(file1, "file1content") + write_file(subfile, "leafcontent") + dsaz.maybe_remove_ubuntu_network_config_scripts(paths=[subdir, file1]) + + for path in (file1, subdir, subfile): + self.assertFalse( + os.path.exists(path), "Found unremoved: %s" % path + ) + + expected_logs = [ + "INFO: Removing Ubuntu extended network scripts because cloud-init" + " updates Azure network configuration on the following events:" + " ['boot', 'boot-legacy']", + "Recursively deleting %s" % subdir, + "Attempting to remove %s" % file1, + ] + for log in expected_logs: + self.assertIn(log, self.logs.getvalue()) + + def test_remove_network_scripts_only_attempts_removal_if_path_exists(self): + """Any files or directories absent are skipped without error.""" + dsaz.maybe_remove_ubuntu_network_config_scripts( + paths=[ + self.tmp_path("nodirhere/", dir=self.tmp), + self.tmp_path("notfilehere", dir=self.tmp), + ] + ) + self.assertNotIn("/not/a", self.logs.getvalue()) # No delete logs + + @mock.patch(MOCKPATH + "os.path.exists") + def test_remove_network_scripts_default_removes_stock_scripts( + self, m_exists + ): + """Azure's stock ubuntu image scripts and artifacts are removed.""" + # Report path absent on all to avoid delete operation + m_exists.return_value = False + dsaz.maybe_remove_ubuntu_network_config_scripts() + calls = m_exists.call_args_list + for path in dsaz.UBUNTU_EXTENDED_NETWORK_SCRIPTS: + self.assertIn(mock.call(path), calls) + + +class TestWBIsPlatformViable(CiTestCase): + """White box tests for _is_platform_viable.""" + + with_logs = True + + @mock.patch(MOCKPATH + "dmi.read_dmi_data") + def test_true_on_non_azure_chassis(self, m_read_dmi_data): + """Return True if DMI chassis-asset-tag is AZURE_CHASSIS_ASSET_TAG.""" + m_read_dmi_data.return_value = dsaz.AZURE_CHASSIS_ASSET_TAG + self.assertTrue(dsaz._is_platform_viable("doesnotmatter")) + + @mock.patch(MOCKPATH + "os.path.exists") + @mock.patch(MOCKPATH + "dmi.read_dmi_data") + def test_true_on_azure_ovf_env_in_seed_dir(self, m_read_dmi_data, m_exist): + """Return True if ovf-env.xml exists in known seed dirs.""" + # Non-matching Azure chassis-asset-tag + m_read_dmi_data.return_value = dsaz.AZURE_CHASSIS_ASSET_TAG + "X" + + m_exist.return_value = True + self.assertTrue(dsaz._is_platform_viable("/some/seed/dir")) + m_exist.called_once_with("/other/seed/dir") + + def test_false_on_no_matching_azure_criteria(self): + """Report non-azure on unmatched asset tag, ovf-env absent and no dev. + + Return False when the asset tag doesn't match Azure's static + AZURE_CHASSIS_ASSET_TAG, no ovf-env.xml files exist in known seed dirs + and no devices have a label starting with prefix 'rd_rdfe_'. + """ + self.assertFalse( + wrap_and_call( + MOCKPATH, + { + "os.path.exists": False, + # Non-matching Azure chassis-asset-tag + "dmi.read_dmi_data": dsaz.AZURE_CHASSIS_ASSET_TAG + "X", + "subp.which": None, + }, + dsaz._is_platform_viable, + "doesnotmatter", + ) + ) + self.assertIn( + "DEBUG: Non-Azure DMI asset tag '{0}' discovered.\n".format( + dsaz.AZURE_CHASSIS_ASSET_TAG + "X" + ), + self.logs.getvalue(), + ) + + +class TestRandomSeed(CiTestCase): + """Test proper handling of random_seed""" + + def test_non_ascii_seed_is_serializable(self): + """Pass if a random string from the Azure infrastructure which + contains at least one non-Unicode character can be converted to/from + JSON without alteration and without throwing an exception. + """ + path = resourceLocation("azure/non_unicode_random_string") + result = dsaz._get_random_seed(path) + + obj = {"seed": result} + try: + serialized = json_dumps(obj) + deserialized = load_json(serialized) + except UnicodeDecodeError: + self.fail("Non-serializable random seed returned") + + self.assertEqual(deserialized["seed"], result) + + +class TestProvisioning: + @pytest.fixture(autouse=True) + def provisioning_setup( + self, + azure_ds, + mock_azure_get_metadata_from_fabric, + mock_azure_report_failure_to_fabric, + mock_net_dhcp_maybe_perform_dhcp_discovery, + mock_net_dhcp_EphemeralIPv4Network, + mock_dmi_read_dmi_data, + mock_get_interfaces, + mock_get_interface_mac, + mock_netlink, + mock_os_path_isfile, + mock_readurl, + mock_subp_subp, + mock_util_ensure_dir, + mock_util_find_devs_with, + mock_util_load_file, + mock_util_mount_cb, + mock_util_write_file, + ): + self.azure_ds = azure_ds + self.mock_azure_get_metadata_from_fabric = ( + mock_azure_get_metadata_from_fabric + ) + self.mock_azure_report_failure_to_fabric = ( + mock_azure_report_failure_to_fabric + ) + self.mock_net_dhcp_maybe_perform_dhcp_discovery = ( + mock_net_dhcp_maybe_perform_dhcp_discovery + ) + self.mock_net_dhcp_EphemeralIPv4Network = ( + mock_net_dhcp_EphemeralIPv4Network + ) + self.mock_dmi_read_dmi_data = mock_dmi_read_dmi_data + self.mock_get_interfaces = mock_get_interfaces + self.mock_get_interface_mac = mock_get_interface_mac + self.mock_netlink = mock_netlink + self.mock_os_path_isfile = mock_os_path_isfile + self.mock_readurl = mock_readurl + self.mock_subp_subp = mock_subp_subp + self.mock_util_ensure_dir = mock_util_ensure_dir + self.mock_util_find_devs_with = mock_util_find_devs_with + self.mock_util_load_file = mock_util_load_file + self.mock_util_mount_cb = mock_util_mount_cb + self.mock_util_write_file = mock_util_write_file + + self.imds_md = { + "extended": {"compute": {"ppsType": "None"}}, + "network": { + "interface": [ + { + "ipv4": { + "ipAddress": [ + { + "privateIpAddress": "10.0.0.22", + "publicIpAddress": "", + } + ], + "subnet": [ + {"address": "10.0.0.0", "prefix": "24"} + ], + }, + "ipv6": {"ipAddress": []}, + "macAddress": "011122334455", + }, + ] + }, + } + + def test_no_pps(self): + self.mock_readurl.side_effect = [ + mock.MagicMock(contents=json.dumps(self.imds_md).encode()), + ] + self.mock_azure_get_metadata_from_fabric.return_value = [] + self.mock_os_path_isfile.side_effect = [False, False, False] + + self.azure_ds._get_data() + + assert self.mock_os_path_isfile.mock_calls == [ + mock.call("/var/lib/cloud/data/poll_imds"), + mock.call( + os.path.join( + self.azure_ds.paths.cloud_dir, "seed/azure/ovf-env.xml" + ) + ), + mock.call("/var/lib/cloud/data/poll_imds"), + ] + + assert self.mock_readurl.mock_calls == [ + mock.call( + "http://169.254.169.254/metadata/instance?" + "api-version=2021-08-01&extended=true", + timeout=2, + headers={"Metadata": "true"}, + retries=0, + exception_cb=dsaz.retry_on_url_exc, + infinite=False, + ), + ] + + # Verify DHCP is setup once. + assert self.mock_net_dhcp_maybe_perform_dhcp_discovery.mock_calls == [ + mock.call(None, dsaz.dhcp_log_cb) + ] + assert self.azure_ds._wireserver_endpoint == "aa:bb:cc:dd" + assert self.azure_ds._is_ephemeral_networking_up() is False + + # Verify DMI usage. + assert self.mock_dmi_read_dmi_data.mock_calls == [ + mock.call("system-uuid") + ] + assert self.azure_ds.metadata["instance-id"] == "fake-system-uuid" + + # Verify IMDS metadata. + assert self.azure_ds.metadata["imds"] == self.imds_md + + # Verify reporting ready once. + assert self.mock_azure_get_metadata_from_fabric.mock_calls == [ + mock.call( + fallback_lease_file=None, + dhcp_opts="aa:bb:cc:dd", + iso_dev="/dev/sr0", + pubkey_info=None, + ) + ] + + # Verify netlink. + assert self.mock_netlink.mock_calls == [] + + def test_running_pps(self): + self.imds_md["extended"]["compute"]["ppsType"] = "Running" + ovf_data = {"HostName": "myhost", "UserName": "myuser"} + + nl_sock = mock.MagicMock() + self.mock_netlink.create_bound_netlink_socket.return_value = nl_sock + self.mock_readurl.side_effect = [ + mock.MagicMock(contents=json.dumps(self.imds_md).encode()), + mock.MagicMock( + contents=construct_valid_ovf_env(data=ovf_data).encode() + ), + mock.MagicMock(contents=json.dumps(self.imds_md).encode()), + ] + self.mock_azure_get_metadata_from_fabric.return_value = [] + self.mock_os_path_isfile.side_effect = [False, False, False, False] + + self.azure_ds._get_data() + + assert self.mock_os_path_isfile.mock_calls == [ + mock.call("/var/lib/cloud/data/poll_imds"), + mock.call( + os.path.join( + self.azure_ds.paths.cloud_dir, "seed/azure/ovf-env.xml" + ) + ), + mock.call("/var/lib/cloud/data/poll_imds"), + mock.call("/var/lib/cloud/data/reported_ready"), + ] + + assert self.mock_readurl.mock_calls == [ + mock.call( + "http://169.254.169.254/metadata/instance?" + "api-version=2021-08-01&extended=true", + timeout=2, + headers={"Metadata": "true"}, + retries=0, + exception_cb=dsaz.retry_on_url_exc, + infinite=False, + ), + mock.call( + "http://169.254.169.254/metadata/reprovisiondata?" + "api-version=2019-06-01", + timeout=2, + headers={"Metadata": "true"}, + exception_cb=mock.ANY, + infinite=True, + log_req_resp=False, + ), + mock.call( + "http://169.254.169.254/metadata/instance?" + "api-version=2021-08-01&extended=true", + timeout=2, + headers={"Metadata": "true"}, + retries=0, + exception_cb=dsaz.retry_on_url_exc, + infinite=False, + ), + ] + + # Verify DHCP is setup twice. + assert self.mock_net_dhcp_maybe_perform_dhcp_discovery.mock_calls == [ + mock.call(None, dsaz.dhcp_log_cb), + mock.call(None, dsaz.dhcp_log_cb), + ] + assert self.azure_ds._wireserver_endpoint == "aa:bb:cc:dd" + assert self.azure_ds._is_ephemeral_networking_up() is False + + # Verify DMI usage. + assert self.mock_dmi_read_dmi_data.mock_calls == [ + mock.call("system-uuid") + ] + assert self.azure_ds.metadata["instance-id"] == "fake-system-uuid" + + # Verify IMDS metadata. + assert self.azure_ds.metadata["imds"] == self.imds_md + + # Verify reporting ready twice. + assert self.mock_azure_get_metadata_from_fabric.mock_calls == [ + mock.call( + fallback_lease_file=None, + dhcp_opts="aa:bb:cc:dd", + iso_dev="/dev/sr0", + pubkey_info=None, + ), + mock.call( + fallback_lease_file=None, + dhcp_opts="aa:bb:cc:dd", + iso_dev=None, + pubkey_info=None, + ), + ] + + # Verify netlink operations for Running PPS. + assert self.mock_netlink.mock_calls == [ + mock.call.create_bound_netlink_socket(), + mock.call.wait_for_media_disconnect_connect(mock.ANY, "ethBoot0"), + mock.call.create_bound_netlink_socket().__bool__(), + mock.call.create_bound_netlink_socket().close(), + ] + + def test_savable_pps(self): + self.imds_md["extended"]["compute"]["ppsType"] = "Savable" + ovf_data = {"HostName": "myhost", "UserName": "myuser"} + + nl_sock = mock.MagicMock() + self.mock_netlink.create_bound_netlink_socket.return_value = nl_sock + self.mock_netlink.wait_for_nic_detach_event.return_value = "eth9" + self.mock_netlink.wait_for_nic_attach_event.return_value = ( + "ethAttached1" + ) + self.mock_readurl.side_effect = [ + mock.MagicMock(contents=json.dumps(self.imds_md).encode()), + mock.MagicMock( + contents=json.dumps(self.imds_md["network"]).encode() + ), + mock.MagicMock( + contents=construct_valid_ovf_env(data=ovf_data).encode() + ), + mock.MagicMock(contents=json.dumps(self.imds_md).encode()), + ] + self.mock_azure_get_metadata_from_fabric.return_value = [] + self.mock_os_path_isfile.side_effect = [ + False, # /var/lib/cloud/data/poll_imds + False, # seed/azure/ovf-env.xml + False, # /var/lib/cloud/data/poll_imds + False, # /var/lib/cloud/data/reported_ready + False, # /var/lib/cloud/data/reported_ready + False, # /var/lib/cloud/data/nic_detached + True, # /var/lib/cloud/data/reported_ready + ] + self.azure_ds._fallback_interface = False + + self.azure_ds._get_data() + + assert self.mock_os_path_isfile.mock_calls == [ + mock.call("/var/lib/cloud/data/poll_imds"), + mock.call( + os.path.join( + self.azure_ds.paths.cloud_dir, "seed/azure/ovf-env.xml" + ) + ), + mock.call("/var/lib/cloud/data/poll_imds"), + mock.call("/var/lib/cloud/data/reported_ready"), + mock.call("/var/lib/cloud/data/reported_ready"), + mock.call("/var/lib/cloud/data/nic_detached"), + mock.call("/var/lib/cloud/data/reported_ready"), + ] + + assert self.mock_readurl.mock_calls == [ + mock.call( + "http://169.254.169.254/metadata/instance?" + "api-version=2021-08-01&extended=true", + timeout=2, + headers={"Metadata": "true"}, + retries=0, + exception_cb=dsaz.retry_on_url_exc, + infinite=False, + ), + mock.call( + "http://169.254.169.254/metadata/instance/network?" + "api-version=2019-06-01", + timeout=2, + headers={"Metadata": "true"}, + retries=0, + exception_cb=mock.ANY, + infinite=True, + ), + mock.call( + "http://169.254.169.254/metadata/reprovisiondata?" + "api-version=2019-06-01", + timeout=2, + headers={"Metadata": "true"}, + exception_cb=mock.ANY, + infinite=True, + log_req_resp=False, + ), + mock.call( + "http://169.254.169.254/metadata/instance?" + "api-version=2021-08-01&extended=true", + timeout=2, + headers={"Metadata": "true"}, + retries=0, + exception_cb=dsaz.retry_on_url_exc, + infinite=False, + ), + ] + + # Verify DHCP is setup twice. + assert self.mock_net_dhcp_maybe_perform_dhcp_discovery.mock_calls == [ + mock.call(None, dsaz.dhcp_log_cb), + mock.call("ethAttached1", dsaz.dhcp_log_cb), + ] + assert self.azure_ds._wireserver_endpoint == "aa:bb:cc:dd" + assert self.azure_ds._is_ephemeral_networking_up() is False + + # Verify DMI usage. + assert self.mock_dmi_read_dmi_data.mock_calls == [ + mock.call("system-uuid") + ] + assert self.azure_ds.metadata["instance-id"] == "fake-system-uuid" + + # Verify IMDS metadata. + assert self.azure_ds.metadata["imds"] == self.imds_md + + # Verify reporting ready twice. + assert self.mock_azure_get_metadata_from_fabric.mock_calls == [ + mock.call( + fallback_lease_file=None, + dhcp_opts="aa:bb:cc:dd", + iso_dev="/dev/sr0", + pubkey_info=None, + ), + mock.call( + fallback_lease_file=None, + dhcp_opts="aa:bb:cc:dd", + iso_dev=None, + pubkey_info=None, + ), + ] + + # Verify netlink operations for Savable PPS. + assert self.mock_netlink.mock_calls == [ + mock.call.create_bound_netlink_socket(), + mock.call.wait_for_nic_detach_event(nl_sock), + mock.call.wait_for_nic_attach_event(nl_sock, ["ethAttached1"]), + mock.call.create_bound_netlink_socket().__bool__(), + mock.call.create_bound_netlink_socket().close(), + ] + + +class TestValidateIMDSMetadata: + @pytest.mark.parametrize( + "mac,expected", + [ + ("001122aabbcc", "00:11:22:aa:bb:cc"), + ("001122AABBCC", "00:11:22:aa:bb:cc"), + ("00:11:22:aa:bb:cc", "00:11:22:aa:bb:cc"), + ("00:11:22:AA:BB:CC", "00:11:22:aa:bb:cc"), + ("pass-through-the-unexpected", "pass-through-the-unexpected"), + ("", ""), + ], + ) + def test_normalize_scenarios(self, mac, expected): + normalized = dsaz.normalize_mac_address(mac) + assert normalized == expected + + def test_empty( + self, azure_ds, caplog, mock_get_interfaces, mock_get_interface_mac + ): + imds_md = {} + + assert azure_ds.validate_imds_network_metadata(imds_md) is False + assert ( + "cloudinit.sources.DataSourceAzure", + 30, + "IMDS network metadata has incomplete configuration: None", + ) in caplog.record_tuples + + def test_validates_one_nic( + self, azure_ds, mock_get_interfaces, mock_get_interface_mac + ): + + mock_get_interfaces.return_value = [ + ("dummy0", "9e:65:d6:19:19:01", None, None), + ("test0", "00:11:22:33:44:55", "hv_netvsc", "0x3"), + ("lo", "00:00:00:00:00:00", None, None), + ] + azure_ds._ephemeral_dhcp_ctx = mock.Mock(iface="test0") + + imds_md = { + "network": { + "interface": [ + { + "ipv4": { + "ipAddress": [ + { + "privateIpAddress": "10.0.0.22", + "publicIpAddress": "", + } + ], + "subnet": [ + {"address": "10.0.0.0", "prefix": "24"} + ], + }, + "ipv6": {"ipAddress": []}, + "macAddress": "001122334455", + } + ] + } + } + + assert azure_ds.validate_imds_network_metadata(imds_md) is True + + def test_validates_multiple_nic( + self, azure_ds, mock_get_interfaces, mock_get_interface_mac + ): + + mock_get_interfaces.return_value = [ + ("dummy0", "9e:65:d6:19:19:01", None, None), + ("test0", "00:11:22:33:44:55", "hv_netvsc", "0x3"), + ("test1", "01:11:22:33:44:55", "hv_netvsc", "0x3"), + ("lo", "00:00:00:00:00:00", None, None), + ] + azure_ds._ephemeral_dhcp_ctx = mock.Mock(iface="test0") + + imds_md = { + "network": { + "interface": [ + { + "ipv4": { + "ipAddress": [ + { + "privateIpAddress": "10.0.0.22", + "publicIpAddress": "", + } + ], + "subnet": [ + {"address": "10.0.0.0", "prefix": "24"} + ], + }, + "ipv6": {"ipAddress": []}, + "macAddress": "001122334455", + }, + { + "ipv4": { + "ipAddress": [ + { + "privateIpAddress": "10.0.0.22", + "publicIpAddress": "", + } + ], + "subnet": [ + {"address": "10.0.0.0", "prefix": "24"} + ], + }, + "ipv6": {"ipAddress": []}, + "macAddress": "011122334455", + }, + ] + } + } + + assert azure_ds.validate_imds_network_metadata(imds_md) is True + + def test_missing_all( + self, azure_ds, caplog, mock_get_interfaces, mock_get_interface_mac + ): + + mock_get_interfaces.return_value = [ + ("dummy0", "9e:65:d6:19:19:01", None, None), + ("test0", "00:11:22:33:44:55", "hv_netvsc", "0x3"), + ("test1", "01:11:22:33:44:55", "hv_netvsc", "0x3"), + ("lo", "00:00:00:00:00:00", None, None), + ] + azure_ds._ephemeral_dhcp_ctx = mock.Mock(iface="test0") + + imds_md = {"network": {"interface": []}} + + assert azure_ds.validate_imds_network_metadata(imds_md) is False + assert ( + "cloudinit.sources.DataSourceAzure", + 30, + "IMDS network metadata is missing configuration for NICs " + "['00:11:22:33:44:55', '01:11:22:33:44:55']: " + f"{imds_md['network']!r}", + ) in caplog.record_tuples + + def test_missing_primary( + self, azure_ds, caplog, mock_get_interfaces, mock_get_interface_mac + ): + + mock_get_interfaces.return_value = [ + ("dummy0", "9e:65:d6:19:19:01", None, None), + ("test0", "00:11:22:33:44:55", "hv_netvsc", "0x3"), + ("test1", "01:11:22:33:44:55", "hv_netvsc", "0x3"), + ("lo", "00:00:00:00:00:00", None, None), + ] + azure_ds._ephemeral_dhcp_ctx = mock.Mock(iface="test0") + + imds_md = { + "network": { + "interface": [ + { + "ipv4": { + "ipAddress": [ + { + "privateIpAddress": "10.0.0.22", + "publicIpAddress": "", + } + ], + "subnet": [ + {"address": "10.0.0.0", "prefix": "24"} + ], + }, + "ipv6": {"ipAddress": []}, + "macAddress": "011122334455", + }, + ] + } + } + + assert azure_ds.validate_imds_network_metadata(imds_md) is False + assert ( + "cloudinit.sources.DataSourceAzure", + 30, + "IMDS network metadata is missing configuration for NICs " + f"['00:11:22:33:44:55']: {imds_md['network']!r}", + ) in caplog.record_tuples + assert ( + "cloudinit.sources.DataSourceAzure", + 30, + "IMDS network metadata is missing primary NIC " + f"'00:11:22:33:44:55': {imds_md['network']!r}", + ) in caplog.record_tuples + + def test_missing_secondary( + self, azure_ds, mock_get_interfaces, mock_get_interface_mac + ): + + mock_get_interfaces.return_value = [ + ("dummy0", "9e:65:d6:19:19:01", None, None), + ("test0", "00:11:22:33:44:55", "hv_netvsc", "0x3"), + ("test1", "01:11:22:33:44:55", "hv_netvsc", "0x3"), + ("lo", "00:00:00:00:00:00", None, None), + ] + azure_ds._ephemeral_dhcp_ctx = mock.Mock(iface="test0") + + imds_md = { + "network": { + "interface": [ + { + "ipv4": { + "ipAddress": [ + { + "privateIpAddress": "10.0.0.22", + "publicIpAddress": "", + } + ], + "subnet": [ + {"address": "10.0.0.0", "prefix": "24"} + ], + }, + "ipv6": {"ipAddress": []}, + "macAddress": "001122334455", + }, + ] + } + } + + assert azure_ds.validate_imds_network_metadata(imds_md) is False + + +# vi: ts=4 expandtab |