# This file is part of cloud-init. See LICENSE file for license information. import copy import crypt import json import os import stat import xml.etree.ElementTree as ET import httpretty import requests import yaml from cloudinit import distros, helpers, url_helper from cloudinit.sources import UNSET from cloudinit.sources import DataSourceAzure as dsaz from cloudinit.sources import InvalidMetaDataException from cloudinit.sources.helpers import netlink from cloudinit.util import ( MountFailedError, b64e, decode_binary, json_dumps, load_file, load_json, write_file, ) from cloudinit.version import version_string as vs from tests.unittests.helpers import ( CiTestCase, ExitStack, HttprettyTestCase, mock, populate_dir, resourceLocation, wrap_and_call, ) def construct_valid_ovf_env( data=None, pubkeys=None, userdata=None, platform_settings=None ): if data is None: data = {"HostName": "FOOHOST"} if pubkeys is None: pubkeys = {} content = """ 1.0 LinuxProvisioningConfiguration """ for key, dval in data.items(): if isinstance(dval, dict): val = dict(dval).get("text") attrs = " " + " ".join( [ "%s='%s'" % (k, v) for k, v in dict(dval).items() if k != "text" ] ) else: val = dval attrs = "" content += "<%s%s>%s\n" % (key, attrs, val, key) if userdata: content += "%s\n" % (b64e(userdata)) if pubkeys: content += "\n" for fp, path, value in pubkeys: content += " " if fp and path: content += "%s%s" % ( fp, path, ) if value: content += "%s" % value content += "\n" content += "" content += """ 1.0 kms.core.windows.net false """ if platform_settings: for k, v in platform_settings.items(): content += "<%s>%s\n" % (k, v, k) if "PreprovisionedVMType" not in platform_settings: content += """""" content += """ """ return content NETWORK_METADATA = { "compute": { "location": "eastus2", "name": "my-hostname", "offer": "UbuntuServer", "osType": "Linux", "placementGroupId": "", "platformFaultDomain": "0", "platformUpdateDomain": "0", "publisher": "Canonical", "resourceGroupName": "srugroup1", "sku": "19.04-DAILY", "subscriptionId": "12aad61c-6de4-4e53-a6c6-5aff52a83777", "tags": "", "version": "19.04.201906190", "vmId": "ff702a6b-cb6a-4fcd-ad68-b4ce38227642", "vmScaleSetName": "", "vmSize": "Standard_DS1_v2", "zone": "", "publicKeys": [{"keyData": "ssh-rsa key1", "path": "path1"}], }, "network": { "interface": [ { "macAddress": "000D3A047598", "ipv6": {"ipAddress": []}, "ipv4": { "subnet": [{"prefix": "24", "address": "10.0.0.0"}], "ipAddress": [ { "privateIpAddress": "10.0.0.4", "publicIpAddress": "104.46.124.81", } ], }, } ] }, } SECONDARY_INTERFACE = { "macAddress": "220D3A047598", "ipv6": {"ipAddress": []}, "ipv4": { "subnet": [{"prefix": "24", "address": "10.0.1.0"}], "ipAddress": [ { "privateIpAddress": "10.0.1.5", } ], }, } SECONDARY_INTERFACE_NO_IP = { "macAddress": "220D3A047598", "ipv6": {"ipAddress": []}, "ipv4": { "subnet": [{"prefix": "24", "address": "10.0.1.0"}], "ipAddress": [], }, } IMDS_NETWORK_METADATA = { "interface": [ { "macAddress": "000D3A047598", "ipv6": {"ipAddress": []}, "ipv4": { "subnet": [{"prefix": "24", "address": "10.0.0.0"}], "ipAddress": [ { "privateIpAddress": "10.0.0.4", "publicIpAddress": "104.46.124.81", } ], }, } ] } MOCKPATH = "cloudinit.sources.DataSourceAzure." EXAMPLE_UUID = "d0df4c54-4ecb-4a4b-9954-5bdf3ed5c3b8" class TestParseNetworkConfig(CiTestCase): maxDiff = None fallback_config = { "version": 1, "config": [ { "type": "physical", "name": "eth0", "mac_address": "00:11:22:33:44:55", "params": {"driver": "hv_netsvc"}, "subnets": [{"type": "dhcp"}], } ], } @mock.patch( "cloudinit.sources.DataSourceAzure.device_driver", return_value=None ) def test_single_ipv4_nic_configuration(self, m_driver): """parse_network_config emits dhcp on single nic with ipv4""" expected = { "ethernets": { "eth0": { "dhcp4": True, "dhcp4-overrides": {"route-metric": 100}, "dhcp6": False, "match": {"macaddress": "00:0d:3a:04:75:98"}, "set-name": "eth0", } }, "version": 2, } self.assertEqual(expected, dsaz.parse_network_config(NETWORK_METADATA)) @mock.patch( "cloudinit.sources.DataSourceAzure.device_driver", return_value=None ) def test_increases_route_metric_for_non_primary_nics(self, m_driver): """parse_network_config increases route-metric for each nic""" expected = { "ethernets": { "eth0": { "dhcp4": True, "dhcp4-overrides": {"route-metric": 100}, "dhcp6": False, "match": {"macaddress": "00:0d:3a:04:75:98"}, "set-name": "eth0", }, "eth1": { "set-name": "eth1", "match": {"macaddress": "22:0d:3a:04:75:98"}, "dhcp6": False, "dhcp4": True, "dhcp4-overrides": {"route-metric": 200}, }, "eth2": { "set-name": "eth2", "match": {"macaddress": "33:0d:3a:04:75:98"}, "dhcp6": False, "dhcp4": True, "dhcp4-overrides": {"route-metric": 300}, }, }, "version": 2, } imds_data = copy.deepcopy(NETWORK_METADATA) imds_data["network"]["interface"].append(SECONDARY_INTERFACE) third_intf = copy.deepcopy(SECONDARY_INTERFACE) third_intf["macAddress"] = third_intf["macAddress"].replace("22", "33") third_intf["ipv4"]["subnet"][0]["address"] = "10.0.2.0" third_intf["ipv4"]["ipAddress"][0]["privateIpAddress"] = "10.0.2.6" imds_data["network"]["interface"].append(third_intf) self.assertEqual(expected, dsaz.parse_network_config(imds_data)) @mock.patch( "cloudinit.sources.DataSourceAzure.device_driver", return_value=None ) def test_ipv4_and_ipv6_route_metrics_match_for_nics(self, m_driver): """parse_network_config emits matching ipv4 and ipv6 route-metrics.""" expected = { "ethernets": { "eth0": { "addresses": ["10.0.0.5/24", "2001:dead:beef::2/128"], "dhcp4": True, "dhcp4-overrides": {"route-metric": 100}, "dhcp6": True, "dhcp6-overrides": {"route-metric": 100}, "match": {"macaddress": "00:0d:3a:04:75:98"}, "set-name": "eth0", }, "eth1": { "set-name": "eth1", "match": {"macaddress": "22:0d:3a:04:75:98"}, "dhcp4": True, "dhcp6": False, "dhcp4-overrides": {"route-metric": 200}, }, "eth2": { "set-name": "eth2", "match": {"macaddress": "33:0d:3a:04:75:98"}, "dhcp4": True, "dhcp4-overrides": {"route-metric": 300}, "dhcp6": True, "dhcp6-overrides": {"route-metric": 300}, }, }, "version": 2, } imds_data = copy.deepcopy(NETWORK_METADATA) nic1 = imds_data["network"]["interface"][0] nic1["ipv4"]["ipAddress"].append({"privateIpAddress": "10.0.0.5"}) nic1["ipv6"] = { "subnet": [{"address": "2001:dead:beef::16"}], "ipAddress": [ {"privateIpAddress": "2001:dead:beef::1"}, {"privateIpAddress": "2001:dead:beef::2"}, ], } imds_data["network"]["interface"].append(SECONDARY_INTERFACE) third_intf = copy.deepcopy(SECONDARY_INTERFACE) third_intf["macAddress"] = third_intf["macAddress"].replace("22", "33") third_intf["ipv4"]["subnet"][0]["address"] = "10.0.2.0" third_intf["ipv4"]["ipAddress"][0]["privateIpAddress"] = "10.0.2.6" third_intf["ipv6"] = { "subnet": [{"prefix": "64", "address": "2001:dead:beef::2"}], "ipAddress": [{"privateIpAddress": "2001:dead:beef::1"}], } imds_data["network"]["interface"].append(third_intf) self.assertEqual(expected, dsaz.parse_network_config(imds_data)) @mock.patch( "cloudinit.sources.DataSourceAzure.device_driver", return_value=None ) def test_ipv4_secondary_ips_will_be_static_addrs(self, m_driver): """parse_network_config emits primary ipv4 as dhcp others are static""" expected = { "ethernets": { "eth0": { "addresses": ["10.0.0.5/24"], "dhcp4": True, "dhcp4-overrides": {"route-metric": 100}, "dhcp6": True, "dhcp6-overrides": {"route-metric": 100}, "match": {"macaddress": "00:0d:3a:04:75:98"}, "set-name": "eth0", } }, "version": 2, } imds_data = copy.deepcopy(NETWORK_METADATA) nic1 = imds_data["network"]["interface"][0] nic1["ipv4"]["ipAddress"].append({"privateIpAddress": "10.0.0.5"}) nic1["ipv6"] = { "subnet": [{"prefix": "10", "address": "2001:dead:beef::16"}], "ipAddress": [{"privateIpAddress": "2001:dead:beef::1"}], } self.assertEqual(expected, dsaz.parse_network_config(imds_data)) @mock.patch( "cloudinit.sources.DataSourceAzure.device_driver", return_value=None ) def test_ipv6_secondary_ips_will_be_static_cidrs(self, m_driver): """parse_network_config emits primary ipv6 as dhcp others are static""" expected = { "ethernets": { "eth0": { "addresses": ["10.0.0.5/24", "2001:dead:beef::2/10"], "dhcp4": True, "dhcp4-overrides": {"route-metric": 100}, "dhcp6": True, "dhcp6-overrides": {"route-metric": 100}, "match": {"macaddress": "00:0d:3a:04:75:98"}, "set-name": "eth0", } }, "version": 2, } imds_data = copy.deepcopy(NETWORK_METADATA) nic1 = imds_data["network"]["interface"][0] nic1["ipv4"]["ipAddress"].append({"privateIpAddress": "10.0.0.5"}) # Secondary ipv6 addresses currently ignored/unconfigured nic1["ipv6"] = { "subnet": [{"prefix": "10", "address": "2001:dead:beef::16"}], "ipAddress": [ {"privateIpAddress": "2001:dead:beef::1"}, {"privateIpAddress": "2001:dead:beef::2"}, ], } self.assertEqual(expected, dsaz.parse_network_config(imds_data)) @mock.patch( "cloudinit.sources.DataSourceAzure.device_driver", return_value="hv_netvsc", ) def test_match_driver_for_netvsc(self, m_driver): """parse_network_config emits driver when using netvsc.""" expected = { "ethernets": { "eth0": { "dhcp4": True, "dhcp4-overrides": {"route-metric": 100}, "dhcp6": False, "match": { "macaddress": "00:0d:3a:04:75:98", "driver": "hv_netvsc", }, "set-name": "eth0", } }, "version": 2, } self.assertEqual(expected, dsaz.parse_network_config(NETWORK_METADATA)) @mock.patch( "cloudinit.sources.DataSourceAzure.device_driver", return_value=None ) @mock.patch("cloudinit.net.generate_fallback_config") def test_parse_network_config_uses_fallback_cfg_when_no_network_metadata( self, m_fallback_config, m_driver ): """parse_network_config generates fallback network config when the IMDS instance metadata is corrupted/invalid, such as when network metadata is not present. """ imds_metadata_missing_network_metadata = copy.deepcopy( NETWORK_METADATA ) del imds_metadata_missing_network_metadata["network"] m_fallback_config.return_value = self.fallback_config self.assertEqual( self.fallback_config, dsaz.parse_network_config(imds_metadata_missing_network_metadata), ) @mock.patch( "cloudinit.sources.DataSourceAzure.device_driver", return_value=None ) @mock.patch("cloudinit.net.generate_fallback_config") def test_parse_network_config_uses_fallback_cfg_when_no_interface_metadata( self, m_fallback_config, m_driver ): """parse_network_config generates fallback network config when the IMDS instance metadata is corrupted/invalid, such as when network interface metadata is not present. """ imds_metadata_missing_interface_metadata = copy.deepcopy( NETWORK_METADATA ) del imds_metadata_missing_interface_metadata["network"]["interface"] m_fallback_config.return_value = self.fallback_config self.assertEqual( self.fallback_config, dsaz.parse_network_config( imds_metadata_missing_interface_metadata ), ) class TestGetMetadataFromIMDS(HttprettyTestCase): with_logs = True def setUp(self): super(TestGetMetadataFromIMDS, self).setUp() self.network_md_url = "{}/instance?api-version=2019-06-01".format( dsaz.IMDS_URL ) @mock.patch(MOCKPATH + "readurl") @mock.patch(MOCKPATH + "EphemeralDHCPv4", autospec=True) @mock.patch(MOCKPATH + "net.is_up", autospec=True) def test_get_metadata_does_not_dhcp_if_network_is_up( self, m_net_is_up, m_dhcp, m_readurl ): """Do not perform DHCP setup when nic is already up.""" m_net_is_up.return_value = True m_readurl.return_value = url_helper.StringResponse( json.dumps(NETWORK_METADATA).encode("utf-8") ) self.assertEqual( NETWORK_METADATA, dsaz.get_metadata_from_imds("eth9", retries=3) ) m_net_is_up.assert_called_with("eth9") m_dhcp.assert_not_called() self.assertIn( "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time self.logs.getvalue(), ) @mock.patch(MOCKPATH + "readurl", autospec=True) @mock.patch(MOCKPATH + "EphemeralDHCPv4") @mock.patch(MOCKPATH + "net.is_up") def test_get_metadata_uses_instance_url( self, m_net_is_up, m_dhcp, m_readurl ): """Make sure readurl is called with the correct url when accessing metadata""" m_net_is_up.return_value = True m_readurl.return_value = url_helper.StringResponse( json.dumps(IMDS_NETWORK_METADATA).encode("utf-8") ) dsaz.get_metadata_from_imds( "eth0", retries=3, md_type=dsaz.metadata_type.all ) m_readurl.assert_called_with( "http://169.254.169.254/metadata/instance?api-version=2019-06-01", exception_cb=mock.ANY, headers=mock.ANY, retries=mock.ANY, timeout=mock.ANY, infinite=False, ) @mock.patch(MOCKPATH + "readurl", autospec=True) @mock.patch(MOCKPATH + "EphemeralDHCPv4") @mock.patch(MOCKPATH + "net.is_up") def test_get_network_metadata_uses_network_url( self, m_net_is_up, m_dhcp, m_readurl ): """Make sure readurl is called with the correct url when accessing network metadata""" m_net_is_up.return_value = True m_readurl.return_value = url_helper.StringResponse( json.dumps(IMDS_NETWORK_METADATA).encode("utf-8") ) dsaz.get_metadata_from_imds( "eth0", retries=3, md_type=dsaz.metadata_type.network ) m_readurl.assert_called_with( "http://169.254.169.254/metadata/instance/network?api-version=" "2019-06-01", exception_cb=mock.ANY, headers=mock.ANY, retries=mock.ANY, timeout=mock.ANY, infinite=False, ) @mock.patch(MOCKPATH + "readurl", autospec=True) @mock.patch(MOCKPATH + "EphemeralDHCPv4") @mock.patch(MOCKPATH + "net.is_up") def test_get_default_metadata_uses_instance_url( self, m_net_is_up, m_dhcp, m_readurl ): """Make sure readurl is called with the correct url when accessing metadata""" m_net_is_up.return_value = True m_readurl.return_value = url_helper.StringResponse( json.dumps(IMDS_NETWORK_METADATA).encode("utf-8") ) dsaz.get_metadata_from_imds("eth0", retries=3) m_readurl.assert_called_with( "http://169.254.169.254/metadata/instance?api-version=2019-06-01", exception_cb=mock.ANY, headers=mock.ANY, retries=mock.ANY, timeout=mock.ANY, infinite=False, ) @mock.patch(MOCKPATH + "readurl", autospec=True) @mock.patch(MOCKPATH + "EphemeralDHCPv4") @mock.patch(MOCKPATH + "net.is_up") def test_get_metadata_uses_extended_url( self, m_net_is_up, m_dhcp, m_readurl ): """Make sure readurl is called with the correct url when accessing metadata""" m_net_is_up.return_value = True m_readurl.return_value = url_helper.StringResponse( json.dumps(IMDS_NETWORK_METADATA).encode("utf-8") ) dsaz.get_metadata_from_imds( "eth0", retries=3, md_type=dsaz.metadata_type.all, api_version="2021-08-01", ) m_readurl.assert_called_with( "http://169.254.169.254/metadata/instance?api-version=" "2021-08-01&extended=true", exception_cb=mock.ANY, headers=mock.ANY, retries=mock.ANY, timeout=mock.ANY, infinite=False, ) @mock.patch(MOCKPATH + "readurl", autospec=True) @mock.patch(MOCKPATH + "EphemeralDHCPv4WithReporting", autospec=True) @mock.patch(MOCKPATH + "net.is_up", autospec=True) def test_get_metadata_performs_dhcp_when_network_is_down( self, m_net_is_up, m_dhcp, m_readurl ): """Perform DHCP setup when nic is not up.""" m_net_is_up.return_value = False m_readurl.return_value = url_helper.StringResponse( json.dumps(NETWORK_METADATA).encode("utf-8") ) self.assertEqual( NETWORK_METADATA, dsaz.get_metadata_from_imds("eth9", retries=2) ) m_net_is_up.assert_called_with("eth9") m_dhcp.assert_called_with(mock.ANY, "eth9") self.assertIn( "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time self.logs.getvalue(), ) m_readurl.assert_called_with( self.network_md_url, exception_cb=mock.ANY, headers={"Metadata": "true"}, retries=2, timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS, infinite=False, ) @mock.patch("cloudinit.url_helper.time.sleep") @mock.patch(MOCKPATH + "net.is_up", autospec=True) def test_get_metadata_from_imds_empty_when_no_imds_present( self, m_net_is_up, m_sleep ): """Return empty dict when IMDS network metadata is absent.""" httpretty.register_uri( httpretty.GET, dsaz.IMDS_URL + "/instance?api-version=2017-12-01", body={}, status=404, ) m_net_is_up.return_value = True # skips dhcp self.assertEqual({}, dsaz.get_metadata_from_imds("eth9", retries=2)) m_net_is_up.assert_called_with("eth9") self.assertEqual([mock.call(1), mock.call(1)], m_sleep.call_args_list) self.assertIn( "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time self.logs.getvalue(), ) @mock.patch("requests.Session.request") @mock.patch("cloudinit.url_helper.time.sleep") @mock.patch(MOCKPATH + "net.is_up", autospec=True) def test_get_metadata_from_imds_retries_on_timeout( self, m_net_is_up, m_sleep, m_request ): """Retry IMDS network metadata on timeout errors.""" self.attempt = 0 m_request.side_effect = requests.Timeout("Fake Connection Timeout") def retry_callback(request, uri, headers): self.attempt += 1 raise requests.Timeout("Fake connection timeout") httpretty.register_uri( httpretty.GET, dsaz.IMDS_URL + "instance?api-version=2017-12-01", body=retry_callback, ) m_net_is_up.return_value = True # skips dhcp self.assertEqual({}, dsaz.get_metadata_from_imds("eth9", retries=3)) m_net_is_up.assert_called_with("eth9") self.assertEqual([mock.call(1)] * 3, m_sleep.call_args_list) self.assertIn( "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time self.logs.getvalue(), ) class TestAzureDataSource(CiTestCase): with_logs = True def setUp(self): super(TestAzureDataSource, self).setUp() self.tmp = self.tmp_dir() # patch cloud_dir, so our 'seed_dir' is guaranteed empty self.paths = helpers.Paths( {"cloud_dir": self.tmp, "run_dir": self.tmp} ) self.waagent_d = os.path.join(self.tmp, "var", "lib", "waagent") self.patches = ExitStack() self.addCleanup(self.patches.close) self.patches.enter_context( mock.patch.object(dsaz, "_get_random_seed", return_value="wild") ) self.m_get_metadata_from_imds = self.patches.enter_context( mock.patch.object( dsaz, "get_metadata_from_imds", mock.MagicMock(return_value=NETWORK_METADATA), ) ) self.m_fallback_nic = self.patches.enter_context( mock.patch( "cloudinit.sources.net.find_fallback_nic", return_value="eth9" ) ) self.m_remove_ubuntu_network_scripts = self.patches.enter_context( mock.patch.object( dsaz, "maybe_remove_ubuntu_network_config_scripts", mock.MagicMock(), ) ) super(TestAzureDataSource, self).setUp() def apply_patches(self, patches): for module, name, new in patches: self.patches.enter_context(mock.patch.object(module, name, new)) def _get_mockds(self): sysctl_out = ( "dev.storvsc.3.%pnpinfo: " "classid=ba6163d9-04a1-4d29-b605-72e2ffb1dc7f " "deviceid=f8b3781b-1e82-4818-a1c3-63d806ec15bb\n" ) sysctl_out += ( "dev.storvsc.2.%pnpinfo: " "classid=ba6163d9-04a1-4d29-b605-72e2ffb1dc7f " "deviceid=f8b3781a-1e82-4818-a1c3-63d806ec15bb\n" ) sysctl_out += ( "dev.storvsc.1.%pnpinfo: " "classid=32412632-86cb-44a2-9b5c-50d1417354f5 " "deviceid=00000000-0001-8899-0000-000000000000\n" ) camctl_devbus = """ scbus0 on ata0 bus 0 scbus1 on ata1 bus 0 scbus2 on blkvsc0 bus 0 scbus3 on blkvsc1 bus 0 scbus4 on storvsc2 bus 0 scbus5 on storvsc3 bus 0 scbus-1 on xpt0 bus 0 """ camctl_dev = """ at scbus1 target 0 lun 0 (cd0,pass0) at scbus2 target 0 lun 0 (da0,pass1) at scbus3 target 1 lun 0 (da1,pass2) """ self.apply_patches( [ ( dsaz, "get_dev_storvsc_sysctl", mock.MagicMock(return_value=sysctl_out), ), ( dsaz, "get_camcontrol_dev_bus", mock.MagicMock(return_value=camctl_devbus), ), ( dsaz, "get_camcontrol_dev", mock.MagicMock(return_value=camctl_dev), ), ] ) return dsaz def _get_ds( self, data, distro="ubuntu", apply_network=None, instance_id=None ): def _wait_for_files(flist, _maxwait=None, _naplen=None): data["waited"] = flist return [] def _load_possible_azure_ds(seed_dir, cache_dir): yield seed_dir yield dsaz.DEFAULT_PROVISIONING_ISO_DEV yield from data.get("dsdevs", []) if cache_dir: yield cache_dir seed_dir = os.path.join(self.paths.seed_dir, "azure") if data.get("ovfcontent") is not None: populate_dir(seed_dir, {"ovf-env.xml": data["ovfcontent"]}) dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d self.m_is_platform_viable = mock.MagicMock(autospec=True) self.m_get_metadata_from_fabric = mock.MagicMock( return_value={"public-keys": []} ) self.m_report_failure_to_fabric = mock.MagicMock(autospec=True) self.m_ephemeral_dhcpv4 = mock.MagicMock() self.m_ephemeral_dhcpv4_with_reporting = mock.MagicMock() self.m_list_possible_azure_ds = mock.MagicMock( side_effect=_load_possible_azure_ds ) if instance_id: self.instance_id = instance_id else: self.instance_id = EXAMPLE_UUID def _dmi_mocks(key): if key == "system-uuid": return self.instance_id elif key == "chassis-asset-tag": return "7783-7084-3265-9085-8269-3286-77" self.apply_patches( [ ( dsaz, "list_possible_azure_ds", self.m_list_possible_azure_ds, ), (dsaz, "_is_platform_viable", self.m_is_platform_viable), ( dsaz, "get_metadata_from_fabric", self.m_get_metadata_from_fabric, ), ( dsaz, "report_failure_to_fabric", self.m_report_failure_to_fabric, ), (dsaz, "EphemeralDHCPv4", self.m_ephemeral_dhcpv4), ( dsaz, "EphemeralDHCPv4WithReporting", self.m_ephemeral_dhcpv4_with_reporting, ), (dsaz, "get_boot_telemetry", mock.MagicMock()), (dsaz, "get_system_info", mock.MagicMock()), (dsaz.subp, "which", lambda x: True), ( dsaz.dmi, "read_dmi_data", mock.MagicMock(side_effect=_dmi_mocks), ), ( dsaz.util, "wait_for_files", mock.MagicMock(side_effect=_wait_for_files), ), ] ) if isinstance(distro, str): distro_cls = distros.fetch(distro) distro = distro_cls(distro, data.get("sys_cfg", {}), self.paths) dsrc = dsaz.DataSourceAzure( data.get("sys_cfg", {}), distro=distro, paths=self.paths ) if apply_network is not None: dsrc.ds_cfg["apply_network_config"] = apply_network return dsrc def _get_and_setup(self, dsrc): ret = dsrc.get_data() if ret: dsrc.setup(True) return ret def xml_equals(self, oxml, nxml): """Compare two sets of XML to make sure they are equal""" def create_tag_index(xml): et = ET.fromstring(xml) ret = {} for x in et.iter(): ret[x.tag] = x return ret def tags_exists(x, y): for tag in x.keys(): assert tag in y for tag in y.keys(): assert tag in x def tags_equal(x, y): for x_val in x.values(): y_val = y.get(x_val.tag) assert x_val.text == y_val.text old_cnt = create_tag_index(oxml) new_cnt = create_tag_index(nxml) tags_exists(old_cnt, new_cnt) tags_equal(old_cnt, new_cnt) def xml_notequals(self, oxml, nxml): try: self.xml_equals(oxml, nxml) except AssertionError: return raise AssertionError("XML is the same") def test_get_resource_disk(self): ds = self._get_mockds() dev = ds.get_resource_disk_on_freebsd(1) self.assertEqual("da1", dev) def test_not_is_platform_viable_seed_should_return_no_datasource(self): """Check seed_dir using _is_platform_viable and return False.""" # Return a non-matching asset tag value data = {} dsrc = self._get_ds(data) self.m_is_platform_viable.return_value = False with mock.patch.object( dsrc, "crawl_metadata" ) as m_crawl_metadata, mock.patch.object( dsrc, "_report_failure" ) as m_report_failure: ret = dsrc.get_data() self.m_is_platform_viable.assert_called_with(dsrc.seed_dir) self.assertFalse(ret) # Assert that for non viable platforms, # there is no communication with the Azure datasource. self.assertEqual(0, m_crawl_metadata.call_count) self.assertEqual(0, m_report_failure.call_count) def test_platform_viable_but_no_devs_should_return_no_datasource(self): """For platforms where the Azure platform is viable (which is indicated by the matching asset tag), the absence of any devs at all (devs == candidate sources for crawling Azure datasource) is NOT expected. Report failure to Azure as this is an unexpected fatal error. """ data = {} dsrc = self._get_ds(data) with mock.patch.object(dsrc, "_report_failure") as m_report_failure: self.m_is_platform_viable.return_value = True ret = dsrc.get_data() self.m_is_platform_viable.assert_called_with(dsrc.seed_dir) self.assertFalse(ret) self.assertEqual(1, m_report_failure.call_count) def test_crawl_metadata_exception_returns_no_datasource(self): data = {} dsrc = self._get_ds(data) self.m_is_platform_viable.return_value = True with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata: m_crawl_metadata.side_effect = Exception ret = dsrc.get_data() self.m_is_platform_viable.assert_called_with(dsrc.seed_dir) self.assertEqual(1, m_crawl_metadata.call_count) self.assertFalse(ret) def test_crawl_metadata_exception_should_report_failure_with_msg(self): data = {} dsrc = self._get_ds(data) self.m_is_platform_viable.return_value = True with mock.patch.object( dsrc, "crawl_metadata" ) as m_crawl_metadata, mock.patch.object( dsrc, "_report_failure" ) as m_report_failure: m_crawl_metadata.side_effect = Exception dsrc.get_data() self.assertEqual(1, m_crawl_metadata.call_count) m_report_failure.assert_called_once_with( description=dsaz.DEFAULT_REPORT_FAILURE_USER_VISIBLE_MESSAGE ) def test_crawl_metadata_exc_should_log_could_not_crawl_msg(self): data = {} dsrc = self._get_ds(data) self.m_is_platform_viable.return_value = True with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata: m_crawl_metadata.side_effect = Exception dsrc.get_data() self.assertEqual(1, m_crawl_metadata.call_count) self.assertIn( "Could not crawl Azure metadata", self.logs.getvalue() ) def test_basic_seed_dir(self): odata = {"HostName": "myhost", "UserName": "myuser"} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": {}, } dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) self.assertEqual(dsrc.userdata_raw, "") self.assertEqual(dsrc.metadata["local-hostname"], odata["HostName"]) self.assertTrue( os.path.isfile(os.path.join(self.waagent_d, "ovf-env.xml")) ) self.assertEqual("azure", dsrc.cloud_name) self.assertEqual("azure", dsrc.platform_type) self.assertEqual( "seed-dir (%s/seed/azure)" % self.tmp, dsrc.subplatform ) def test_basic_dev_file(self): """When a device path is used, present that in subplatform.""" data = {"sys_cfg": {}, "dsdevs": ["/dev/cd0"]} dsrc = self._get_ds(data) # DSAzure will attempt to mount /dev/sr0 first, which should # fail with mount error since the list of devices doesn't have # /dev/sr0 with mock.patch(MOCKPATH + "util.mount_cb") as m_mount_cb: m_mount_cb.side_effect = [ MountFailedError("fail"), ({"local-hostname": "me"}, "ud", {"cfg": ""}, {}), ] self.assertTrue(dsrc.get_data()) self.assertEqual(dsrc.userdata_raw, "ud") self.assertEqual(dsrc.metadata["local-hostname"], "me") self.assertEqual("azure", dsrc.cloud_name) self.assertEqual("azure", dsrc.platform_type) self.assertEqual("config-disk (/dev/cd0)", dsrc.subplatform) def test_get_data_non_ubuntu_will_not_remove_network_scripts(self): """get_data on non-Ubuntu will not remove ubuntu net scripts.""" odata = {"HostName": "myhost", "UserName": "myuser"} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": {}, } dsrc = self._get_ds(data, distro="debian") dsrc.get_data() self.m_remove_ubuntu_network_scripts.assert_not_called() def test_get_data_on_ubuntu_will_remove_network_scripts(self): """get_data will remove ubuntu net scripts on Ubuntu distro.""" sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {"HostName": "myhost", "UserName": "myuser"} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": sys_cfg, } dsrc = self._get_ds(data, distro="ubuntu") dsrc.get_data() self.m_remove_ubuntu_network_scripts.assert_called_once_with() def test_get_data_on_ubuntu_will_not_remove_network_scripts_disabled(self): """When apply_network_config false, do not remove scripts on Ubuntu.""" sys_cfg = {"datasource": {"Azure": {"apply_network_config": False}}} odata = {"HostName": "myhost", "UserName": "myuser"} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": sys_cfg, } dsrc = self._get_ds(data, distro="ubuntu") dsrc.get_data() self.m_remove_ubuntu_network_scripts.assert_not_called() def test_crawl_metadata_returns_structured_data_and_caches_nothing(self): """Return all structured metadata and cache no class attributes.""" yaml_cfg = "" odata = { "HostName": "myhost", "UserName": "myuser", "UserData": {"text": "FOOBAR", "encoding": "plain"}, "dscfg": {"text": yaml_cfg, "encoding": "plain"}, } data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": {}, } dsrc = self._get_ds(data) expected_cfg = { "PreprovisionedVMType": None, "PreprovisionedVm": False, "datasource": {"Azure": {}}, "system_info": {"default_user": {"name": "myuser"}}, } expected_metadata = { "azure_data": { "configurationsettype": "LinuxProvisioningConfiguration" }, "imds": NETWORK_METADATA, "instance-id": EXAMPLE_UUID, "local-hostname": "myhost", "random_seed": "wild", } crawled_metadata = dsrc.crawl_metadata() self.assertCountEqual( crawled_metadata.keys(), ["cfg", "files", "metadata", "userdata_raw"], ) self.assertEqual(crawled_metadata["cfg"], expected_cfg) self.assertEqual( list(crawled_metadata["files"].keys()), ["ovf-env.xml"] ) self.assertIn( b"myhost", crawled_metadata["files"]["ovf-env.xml"], ) self.assertEqual(crawled_metadata["metadata"], expected_metadata) self.assertEqual(crawled_metadata["userdata_raw"], "FOOBAR") self.assertEqual(dsrc.userdata_raw, None) self.assertEqual(dsrc.metadata, {}) self.assertEqual(dsrc._metadata_imds, UNSET) self.assertFalse( os.path.isfile(os.path.join(self.waagent_d, "ovf-env.xml")) ) def test_crawl_metadata_raises_invalid_metadata_on_error(self): """crawl_metadata raises an exception on invalid ovf-env.xml.""" data = {"ovfcontent": "BOGUS", "sys_cfg": {}} dsrc = self._get_ds(data) error_msg = ( "BrokenAzureDataSource: Invalid ovf-env.xml:" " syntax error: line 1, column 0" ) with self.assertRaises(InvalidMetaDataException) as cm: dsrc.crawl_metadata() self.assertEqual(str(cm.exception), error_msg) def test_crawl_metadata_call_imds_once_no_reprovision(self): """If reprovisioning, report ready at the end""" ovfenv = construct_valid_ovf_env( platform_settings={"PreprovisionedVm": "False"} ) data = {"ovfcontent": ovfenv, "sys_cfg": {}} dsrc = self._get_ds(data) dsrc.crawl_metadata() self.assertEqual(1, self.m_get_metadata_from_imds.call_count) @mock.patch( "cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting" ) @mock.patch("cloudinit.sources.DataSourceAzure.util.write_file") @mock.patch( "cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready" ) @mock.patch("cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds") def test_crawl_metadata_call_imds_twice_with_reprovision( self, poll_imds_func, m_report_ready, m_write, m_dhcp ): """If reprovisioning, imds metadata will be fetched twice""" ovfenv = construct_valid_ovf_env( platform_settings={"PreprovisionedVm": "True"} ) data = {"ovfcontent": ovfenv, "sys_cfg": {}} dsrc = self._get_ds(data) poll_imds_func.return_value = ovfenv dsrc.crawl_metadata() self.assertEqual(2, self.m_get_metadata_from_imds.call_count) @mock.patch( "cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting" ) @mock.patch("cloudinit.sources.DataSourceAzure.util.write_file") @mock.patch( "cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready" ) @mock.patch("cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds") def test_crawl_metadata_on_reprovision_reports_ready( self, poll_imds_func, m_report_ready, m_write, m_dhcp ): """If reprovisioning, report ready at the end""" ovfenv = construct_valid_ovf_env( platform_settings={"PreprovisionedVm": "True"} ) data = {"ovfcontent": ovfenv, "sys_cfg": {}} dsrc = self._get_ds(data) poll_imds_func.return_value = ovfenv dsrc.crawl_metadata() self.assertEqual(1, m_report_ready.call_count) @mock.patch( "cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting" ) @mock.patch("cloudinit.sources.DataSourceAzure.util.write_file") @mock.patch( "cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready" ) @mock.patch("cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds") @mock.patch( "cloudinit.sources.DataSourceAzure.DataSourceAzure." "_wait_for_all_nics_ready" ) def test_crawl_metadata_waits_for_nic_on_savable_vms( self, detect_nics, poll_imds_func, report_ready_func, m_write, m_dhcp ): """If reprovisioning, report ready at the end""" ovfenv = construct_valid_ovf_env( platform_settings={ "PreprovisionedVMType": "Savable", "PreprovisionedVm": "True", } ) data = {"ovfcontent": ovfenv, "sys_cfg": {}} dsrc = self._get_ds(data) poll_imds_func.return_value = ovfenv dsrc.crawl_metadata() self.assertEqual(1, report_ready_func.call_count) self.assertEqual(1, detect_nics.call_count) @mock.patch( "cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting" ) @mock.patch("cloudinit.sources.DataSourceAzure.util.write_file") @mock.patch( "cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready" ) @mock.patch("cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds") @mock.patch( "cloudinit.sources.DataSourceAzure.DataSourceAzure." "_wait_for_all_nics_ready" ) @mock.patch("os.path.isfile") def test_detect_nics_when_marker_present( self, is_file, detect_nics, poll_imds_func, report_ready_func, m_write, m_dhcp, ): """If reprovisioning, wait for nic attach if marker present""" def is_file_ret(key): return key == dsaz.REPROVISION_NIC_ATTACH_MARKER_FILE is_file.side_effect = is_file_ret ovfenv = construct_valid_ovf_env() data = {"ovfcontent": ovfenv, "sys_cfg": {}} dsrc = self._get_ds(data) poll_imds_func.return_value = ovfenv dsrc.crawl_metadata() self.assertEqual(1, report_ready_func.call_count) self.assertEqual(1, detect_nics.call_count) @mock.patch("cloudinit.sources.DataSourceAzure.util.write_file") @mock.patch( "cloudinit.sources.helpers.netlink.wait_for_media_disconnect_connect" ) @mock.patch( "cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready" ) @mock.patch("cloudinit.sources.DataSourceAzure.readurl") def test_crawl_metadata_on_reprovision_reports_ready_using_lease( self, m_readurl, m_report_ready, m_media_switch, m_write ): """If reprovisioning, report ready using the obtained lease""" ovfenv = construct_valid_ovf_env( platform_settings={"PreprovisionedVm": "True"} ) data = {"ovfcontent": ovfenv, "sys_cfg": {}} dsrc = self._get_ds(data) with mock.patch.object( dsrc.distro.networking, "is_up" ) as m_dsrc_distro_networking_is_up: # For this mock, net should not be up, # so that cached ephemeral won't be used. # This is so that a NEW ephemeral dhcp lease will be discovered # and used instead. m_dsrc_distro_networking_is_up.return_value = False lease = { "interface": "eth9", "fixed-address": "192.168.2.9", "routers": "192.168.2.1", "subnet-mask": "255.255.255.0", "unknown-245": "624c3620", } self.m_ephemeral_dhcpv4_with_reporting.return_value.__enter__.return_value = ( # noqa: E501 lease ) m_media_switch.return_value = None reprovision_ovfenv = construct_valid_ovf_env() m_readurl.return_value = url_helper.StringResponse( reprovision_ovfenv.encode("utf-8") ) dsrc.crawl_metadata() self.assertEqual(2, m_report_ready.call_count) m_report_ready.assert_called_with(lease=lease) def test_waagent_d_has_0700_perms(self): # we expect /var/lib/waagent to be created 0700 dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) ret = dsrc.get_data() self.assertTrue(ret) self.assertTrue(os.path.isdir(self.waagent_d)) self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0o700) @mock.patch( "cloudinit.sources.DataSourceAzure.device_driver", return_value=None ) def test_network_config_set_from_imds(self, m_driver): """Datasource.network_config returns IMDS network data.""" sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": sys_cfg, } expected_network_config = { "ethernets": { "eth0": { "set-name": "eth0", "match": {"macaddress": "00:0d:3a:04:75:98"}, "dhcp6": False, "dhcp4": True, "dhcp4-overrides": {"route-metric": 100}, } }, "version": 2, } dsrc = self._get_ds(data) dsrc.get_data() self.assertEqual(expected_network_config, dsrc.network_config) @mock.patch( "cloudinit.sources.DataSourceAzure.device_driver", return_value=None ) def test_network_config_set_from_imds_route_metric_for_secondary_nic( self, m_driver ): """Datasource.network_config adds route-metric to secondary nics.""" sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": sys_cfg, } expected_network_config = { "ethernets": { "eth0": { "set-name": "eth0", "match": {"macaddress": "00:0d:3a:04:75:98"}, "dhcp6": False, "dhcp4": True, "dhcp4-overrides": {"route-metric": 100}, }, "eth1": { "set-name": "eth1", "match": {"macaddress": "22:0d:3a:04:75:98"}, "dhcp6": False, "dhcp4": True, "dhcp4-overrides": {"route-metric": 200}, }, "eth2": { "set-name": "eth2", "match": {"macaddress": "33:0d:3a:04:75:98"}, "dhcp6": False, "dhcp4": True, "dhcp4-overrides": {"route-metric": 300}, }, }, "version": 2, } imds_data = copy.deepcopy(NETWORK_METADATA) imds_data["network"]["interface"].append(SECONDARY_INTERFACE) third_intf = copy.deepcopy(SECONDARY_INTERFACE) third_intf["macAddress"] = third_intf["macAddress"].replace("22", "33") third_intf["ipv4"]["subnet"][0]["address"] = "10.0.2.0" third_intf["ipv4"]["ipAddress"][0]["privateIpAddress"] = "10.0.2.6" imds_data["network"]["interface"].append(third_intf) self.m_get_metadata_from_imds.return_value = imds_data dsrc = self._get_ds(data) dsrc.get_data() self.assertEqual(expected_network_config, dsrc.network_config) @mock.patch( "cloudinit.sources.DataSourceAzure.device_driver", return_value=None ) def test_network_config_set_from_imds_for_secondary_nic_no_ip( self, m_driver ): """If an IP address is empty then there should no config for it.""" sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": sys_cfg, } expected_network_config = { "ethernets": { "eth0": { "set-name": "eth0", "match": {"macaddress": "00:0d:3a:04:75:98"}, "dhcp6": False, "dhcp4": True, "dhcp4-overrides": {"route-metric": 100}, } }, "version": 2, } imds_data = copy.deepcopy(NETWORK_METADATA) imds_data["network"]["interface"].append(SECONDARY_INTERFACE_NO_IP) self.m_get_metadata_from_imds.return_value = imds_data dsrc = self._get_ds(data) dsrc.get_data() self.assertEqual(expected_network_config, dsrc.network_config) def test_availability_zone_set_from_imds(self): """Datasource.availability returns IMDS platformFaultDomain.""" sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": sys_cfg, } dsrc = self._get_ds(data) dsrc.get_data() self.assertEqual("0", dsrc.availability_zone) def test_region_set_from_imds(self): """Datasource.region returns IMDS region location.""" sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": sys_cfg, } dsrc = self._get_ds(data) dsrc.get_data() self.assertEqual("eastus2", dsrc.region) def test_sys_cfg_set_never_destroy_ntfs(self): sys_cfg = { "datasource": { "Azure": {"never_destroy_ntfs": "user-supplied-value"} } } data = { "ovfcontent": construct_valid_ovf_env(data={}), "sys_cfg": sys_cfg, } dsrc = self._get_ds(data) ret = self._get_and_setup(dsrc) self.assertTrue(ret) self.assertEqual( dsrc.ds_cfg.get(dsaz.DS_CFG_KEY_PRESERVE_NTFS), "user-supplied-value", ) def test_username_used(self): odata = {"HostName": "myhost", "UserName": "myuser"} data = {"ovfcontent": construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) self.assertEqual( dsrc.cfg["system_info"]["default_user"]["name"], "myuser" ) def test_password_given(self): odata = { "HostName": "myhost", "UserName": "myuser", "UserPassword": "mypass", } data = {"ovfcontent": construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) self.assertIn("default_user", dsrc.cfg["system_info"]) defuser = dsrc.cfg["system_info"]["default_user"] # default user should be updated username and should not be locked. self.assertEqual(defuser["name"], odata["UserName"]) self.assertFalse(defuser["lock_passwd"]) # passwd is crypt formated string $id$salt$encrypted # encrypting plaintext with salt value of everything up to final '$' # should equal that after the '$' pos = defuser["passwd"].rfind("$") + 1 self.assertEqual( defuser["passwd"], crypt.crypt(odata["UserPassword"], defuser["passwd"][0:pos]), ) # the same hashed value should also be present in cfg['password'] self.assertEqual(defuser["passwd"], dsrc.cfg["password"]) def test_user_not_locked_if_password_redacted(self): odata = { "HostName": "myhost", "UserName": "myuser", "UserPassword": dsaz.DEF_PASSWD_REDACTION, } data = {"ovfcontent": construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) self.assertIn("default_user", dsrc.cfg["system_info"]) defuser = dsrc.cfg["system_info"]["default_user"] # default user should be updated username and should not be locked. self.assertEqual(defuser["name"], odata["UserName"]) self.assertIn("lock_passwd", defuser) self.assertFalse(defuser["lock_passwd"]) def test_userdata_plain(self): mydata = "FOOBAR" odata = {"UserData": {"text": mydata, "encoding": "plain"}} data = {"ovfcontent": construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) self.assertEqual(decode_binary(dsrc.userdata_raw), mydata) def test_userdata_found(self): mydata = "FOOBAR" odata = {"UserData": {"text": b64e(mydata), "encoding": "base64"}} data = {"ovfcontent": construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) self.assertEqual(dsrc.userdata_raw, mydata.encode("utf-8")) def test_default_ephemeral_configs_ephemeral_exists(self): # make sure the ephemeral configs are correct if disk present odata = {} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": {}, } orig_exists = dsaz.os.path.exists def changed_exists(path): return ( True if path == dsaz.RESOURCE_DISK_PATH else orig_exists(path) ) with mock.patch(MOCKPATH + "os.path.exists", new=changed_exists): dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) cfg = dsrc.get_config_obj() self.assertEqual( dsrc.device_name_to_device("ephemeral0"), dsaz.RESOURCE_DISK_PATH, ) assert "disk_setup" in cfg assert "fs_setup" in cfg self.assertIsInstance(cfg["disk_setup"], dict) self.assertIsInstance(cfg["fs_setup"], list) def test_default_ephemeral_configs_ephemeral_does_not_exist(self): # make sure the ephemeral configs are correct if disk not present odata = {} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": {}, } orig_exists = dsaz.os.path.exists def changed_exists(path): return ( False if path == dsaz.RESOURCE_DISK_PATH else orig_exists(path) ) with mock.patch(MOCKPATH + "os.path.exists", new=changed_exists): dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) cfg = dsrc.get_config_obj() assert "disk_setup" not in cfg assert "fs_setup" not in cfg def test_provide_disk_aliases(self): # Make sure that user can affect disk aliases dscfg = {"disk_aliases": {"ephemeral0": "/dev/sdc"}} odata = { "HostName": "myhost", "UserName": "myuser", "dscfg": {"text": b64e(yaml.dump(dscfg)), "encoding": "base64"}, } usercfg = { "disk_setup": { "/dev/sdc": {"something": "..."}, "ephemeral0": False, } } userdata = "#cloud-config" + yaml.dump(usercfg) + "\n" ovfcontent = construct_valid_ovf_env(data=odata, userdata=userdata) data = {"ovfcontent": ovfcontent, "sys_cfg": {}} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) cfg = dsrc.get_config_obj() self.assertTrue(cfg) def test_userdata_arrives(self): userdata = "This is my user-data" xml = construct_valid_ovf_env(data={}, userdata=userdata) data = {"ovfcontent": xml} dsrc = self._get_ds(data) dsrc.get_data() self.assertEqual(userdata.encode("us-ascii"), dsrc.userdata_raw) def test_password_redacted_in_ovf(self): odata = { "HostName": "myhost", "UserName": "myuser", "UserPassword": "mypass", } data = {"ovfcontent": construct_valid_ovf_env(data=odata)} dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) ovf_env_path = os.path.join(self.waagent_d, "ovf-env.xml") # The XML should not be same since the user password is redacted on_disk_ovf = load_file(ovf_env_path) self.xml_notequals(data["ovfcontent"], on_disk_ovf) # Make sure that the redacted password on disk is not used by CI self.assertNotEqual( dsrc.cfg.get("password"), dsaz.DEF_PASSWD_REDACTION ) # Make sure that the password was really encrypted et = ET.fromstring(on_disk_ovf) for elem in et.iter(): if "UserPassword" in elem.tag: self.assertEqual(dsaz.DEF_PASSWD_REDACTION, elem.text) def test_ovf_env_arrives_in_waagent_dir(self): xml = construct_valid_ovf_env(data={}, userdata="FOODATA") dsrc = self._get_ds({"ovfcontent": xml}) dsrc.get_data() # 'data_dir' is '/var/lib/waagent' (walinux-agent's state dir) # we expect that the ovf-env.xml file is copied there. ovf_env_path = os.path.join(self.waagent_d, "ovf-env.xml") self.assertTrue(os.path.exists(ovf_env_path)) self.xml_equals(xml, load_file(ovf_env_path)) def test_ovf_can_include_unicode(self): xml = construct_valid_ovf_env(data={}) xml = "\ufeff{0}".format(xml) dsrc = self._get_ds({"ovfcontent": xml}) dsrc.get_data() def test_dsaz_report_ready_returns_true_when_report_succeeds(self): dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) self.assertTrue(dsrc._report_ready(lease=mock.MagicMock())) def test_dsaz_report_ready_returns_false_and_does_not_propagate_exc(self): dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) self.m_get_metadata_from_fabric.side_effect = Exception self.assertFalse(dsrc._report_ready(lease=mock.MagicMock())) def test_dsaz_report_failure_returns_true_when_report_succeeds(self): dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata: # mock crawl metadata failure to cause report failure m_crawl_metadata.side_effect = Exception self.assertTrue(dsrc._report_failure()) self.assertEqual(1, self.m_report_failure_to_fabric.call_count) def test_dsaz_report_failure_returns_false_and_does_not_propagate_exc( self, ): dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) with mock.patch.object( dsrc, "crawl_metadata" ) as m_crawl_metadata, mock.patch.object( dsrc, "_ephemeral_dhcp_ctx" ) as m_ephemeral_dhcp_ctx, mock.patch.object( dsrc.distro.networking, "is_up" ) as m_dsrc_distro_networking_is_up: # mock crawl metadata failure to cause report failure m_crawl_metadata.side_effect = Exception # setup mocks to allow using cached ephemeral dhcp lease m_dsrc_distro_networking_is_up.return_value = True test_lease_dhcp_option_245 = "test_lease_dhcp_option_245" test_lease = {"unknown-245": test_lease_dhcp_option_245} m_ephemeral_dhcp_ctx.lease = test_lease # We expect 3 calls to report_failure_to_fabric, # because we try 3 different methods of calling report failure. # The different methods are attempted in the following order: # 1. Using cached ephemeral dhcp context to report failure to Azure # 2. Using new ephemeral dhcp to report failure to Azure # 3. Using fallback lease to report failure to Azure self.m_report_failure_to_fabric.side_effect = Exception self.assertFalse(dsrc._report_failure()) self.assertEqual(3, self.m_report_failure_to_fabric.call_count) def test_dsaz_report_failure_description_msg(self): dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata: # mock crawl metadata failure to cause report failure m_crawl_metadata.side_effect = Exception test_msg = "Test report failure description message" self.assertTrue(dsrc._report_failure(description=test_msg)) self.m_report_failure_to_fabric.assert_called_once_with( dhcp_opts=mock.ANY, description=test_msg ) def test_dsaz_report_failure_no_description_msg(self): dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata: m_crawl_metadata.side_effect = Exception self.assertTrue(dsrc._report_failure()) # no description msg self.m_report_failure_to_fabric.assert_called_once_with( dhcp_opts=mock.ANY, description=None ) def test_dsaz_report_failure_uses_cached_ephemeral_dhcp_ctx_lease(self): dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) with mock.patch.object( dsrc, "crawl_metadata" ) as m_crawl_metadata, mock.patch.object( dsrc, "_ephemeral_dhcp_ctx" ) as m_ephemeral_dhcp_ctx, mock.patch.object( dsrc.distro.networking, "is_up" ) as m_dsrc_distro_networking_is_up: # mock crawl metadata failure to cause report failure m_crawl_metadata.side_effect = Exception # setup mocks to allow using cached ephemeral dhcp lease m_dsrc_distro_networking_is_up.return_value = True test_lease_dhcp_option_245 = "test_lease_dhcp_option_245" test_lease = {"unknown-245": test_lease_dhcp_option_245} m_ephemeral_dhcp_ctx.lease = test_lease self.assertTrue(dsrc._report_failure()) # ensure called with cached ephemeral dhcp lease option 245 self.m_report_failure_to_fabric.assert_called_once_with( description=mock.ANY, dhcp_opts=test_lease_dhcp_option_245 ) # ensure cached ephemeral is cleaned self.assertEqual(1, m_ephemeral_dhcp_ctx.clean_network.call_count) def test_dsaz_report_failure_no_net_uses_new_ephemeral_dhcp_lease(self): dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) with mock.patch.object( dsrc, "crawl_metadata" ) as m_crawl_metadata, mock.patch.object( dsrc.distro.networking, "is_up" ) as m_dsrc_distro_networking_is_up: # mock crawl metadata failure to cause report failure m_crawl_metadata.side_effect = Exception # net is not up and cannot use cached ephemeral dhcp m_dsrc_distro_networking_is_up.return_value = False # setup ephemeral dhcp lease discovery mock test_lease_dhcp_option_245 = "test_lease_dhcp_option_245" test_lease = {"unknown-245": test_lease_dhcp_option_245} self.m_ephemeral_dhcpv4_with_reporting.return_value.__enter__.return_value = ( # noqa: E501 test_lease ) self.assertTrue(dsrc._report_failure()) # ensure called with the newly discovered # ephemeral dhcp lease option 245 self.m_report_failure_to_fabric.assert_called_once_with( description=mock.ANY, dhcp_opts=test_lease_dhcp_option_245 ) def test_dsaz_report_failure_no_net_and_no_dhcp_uses_fallback_lease(self): dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) with mock.patch.object( dsrc, "crawl_metadata" ) as m_crawl_metadata, mock.patch.object( dsrc.distro.networking, "is_up" ) as m_dsrc_distro_networking_is_up: # mock crawl metadata failure to cause report failure m_crawl_metadata.side_effect = Exception # net is not up and cannot use cached ephemeral dhcp m_dsrc_distro_networking_is_up.return_value = False # ephemeral dhcp discovery failure, # so cannot use a new ephemeral dhcp self.m_ephemeral_dhcpv4_with_reporting.return_value.__enter__.side_effect = ( # noqa: E501 Exception ) self.assertTrue(dsrc._report_failure()) # ensure called with fallback lease self.m_report_failure_to_fabric.assert_called_once_with( description=mock.ANY, fallback_lease_file=dsrc.dhclient_lease_file, ) def test_exception_fetching_fabric_data_doesnt_propagate(self): """Errors communicating with fabric should warn, but return True.""" dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) self.m_get_metadata_from_fabric.side_effect = Exception ret = self._get_and_setup(dsrc) self.assertTrue(ret) def test_fabric_data_included_in_metadata(self): dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) self.m_get_metadata_from_fabric.return_value = {"test": "value"} ret = self._get_and_setup(dsrc) self.assertTrue(ret) self.assertEqual("value", dsrc.metadata["test"]) def test_instance_id_case_insensitive(self): """Return the previous iid when current is a case-insensitive match.""" lower_iid = EXAMPLE_UUID.lower() upper_iid = EXAMPLE_UUID.upper() # lowercase current UUID ds = self._get_ds( {"ovfcontent": construct_valid_ovf_env()}, instance_id=lower_iid ) # UPPERCASE previous write_file( os.path.join(self.paths.cloud_dir, "data", "instance-id"), upper_iid, ) ds.get_data() self.assertEqual(upper_iid, ds.metadata["instance-id"]) # UPPERCASE current UUID ds = self._get_ds( {"ovfcontent": construct_valid_ovf_env()}, instance_id=upper_iid ) # lowercase previous write_file( os.path.join(self.paths.cloud_dir, "data", "instance-id"), lower_iid, ) ds.get_data() self.assertEqual(lower_iid, ds.metadata["instance-id"]) def test_instance_id_endianness(self): """Return the previous iid when dmi uuid is the byteswapped iid.""" ds = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) # byte-swapped previous write_file( os.path.join(self.paths.cloud_dir, "data", "instance-id"), "544CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8", ) ds.get_data() self.assertEqual( "544CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8", ds.metadata["instance-id"] ) # not byte-swapped previous write_file( os.path.join(self.paths.cloud_dir, "data", "instance-id"), "644CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8", ) ds.get_data() self.assertEqual(self.instance_id, ds.metadata["instance-id"]) def test_instance_id_from_dmidecode_used(self): ds = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) ds.get_data() self.assertEqual(self.instance_id, ds.metadata["instance-id"]) def test_instance_id_from_dmidecode_used_for_builtin(self): ds = self._get_ds({"ovfcontent": construct_valid_ovf_env()}) ds.get_data() self.assertEqual(self.instance_id, ds.metadata["instance-id"]) @mock.patch(MOCKPATH + "util.is_FreeBSD") @mock.patch(MOCKPATH + "_check_freebsd_cdrom") def test_list_possible_azure_ds(self, m_check_fbsd_cdrom, m_is_FreeBSD): """On FreeBSD, possible devs should show /dev/cd0.""" m_is_FreeBSD.return_value = True m_check_fbsd_cdrom.return_value = True possible_ds = [] for src in dsaz.list_possible_azure_ds("seed_dir", "cache_dir"): possible_ds.append(src) self.assertEqual( possible_ds, [ "seed_dir", dsaz.DEFAULT_PROVISIONING_ISO_DEV, "/dev/cd0", "cache_dir", ], ) self.assertEqual( [mock.call("/dev/cd0")], m_check_fbsd_cdrom.call_args_list ) @mock.patch( "cloudinit.sources.DataSourceAzure.device_driver", return_value=None ) @mock.patch("cloudinit.net.generate_fallback_config") def test_imds_network_config(self, mock_fallback, m_driver): """Network config is generated from IMDS network data when present.""" sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {"HostName": "myhost", "UserName": "myuser"} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": sys_cfg, } dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) expected_cfg = { "ethernets": { "eth0": { "dhcp4": True, "dhcp4-overrides": {"route-metric": 100}, "dhcp6": False, "match": {"macaddress": "00:0d:3a:04:75:98"}, "set-name": "eth0", } }, "version": 2, } self.assertEqual(expected_cfg, dsrc.network_config) mock_fallback.assert_not_called() @mock.patch("cloudinit.net.get_interface_mac") @mock.patch("cloudinit.net.get_devicelist") @mock.patch("cloudinit.net.device_driver") @mock.patch("cloudinit.net.generate_fallback_config") def test_imds_network_ignored_when_apply_network_config_false( self, mock_fallback, mock_dd, mock_devlist, mock_get_mac ): """When apply_network_config is False, use fallback instead of IMDS.""" sys_cfg = {"datasource": {"Azure": {"apply_network_config": False}}} odata = {"HostName": "myhost", "UserName": "myuser"} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": sys_cfg, } fallback_config = { "version": 1, "config": [ { "type": "physical", "name": "eth0", "mac_address": "00:11:22:33:44:55", "params": {"driver": "hv_netsvc"}, "subnets": [{"type": "dhcp"}], } ], } mock_fallback.return_value = fallback_config mock_devlist.return_value = ["eth0"] mock_dd.return_value = ["hv_netsvc"] mock_get_mac.return_value = "00:11:22:33:44:55" dsrc = self._get_ds(data) self.assertTrue(dsrc.get_data()) self.assertEqual(dsrc.network_config, fallback_config) @mock.patch("cloudinit.net.get_interface_mac") @mock.patch("cloudinit.net.get_devicelist") @mock.patch("cloudinit.net.device_driver") @mock.patch("cloudinit.net.generate_fallback_config", autospec=True) def test_fallback_network_config( self, mock_fallback, mock_dd, mock_devlist, mock_get_mac ): """On absent IMDS network data, generate network fallback config.""" odata = {"HostName": "myhost", "UserName": "myuser"} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": {}, } fallback_config = { "version": 1, "config": [ { "type": "physical", "name": "eth0", "mac_address": "00:11:22:33:44:55", "params": {"driver": "hv_netsvc"}, "subnets": [{"type": "dhcp"}], } ], } mock_fallback.return_value = fallback_config mock_devlist.return_value = ["eth0"] mock_dd.return_value = ["hv_netsvc"] mock_get_mac.return_value = "00:11:22:33:44:55" dsrc = self._get_ds(data) # Represent empty response from network imds self.m_get_metadata_from_imds.return_value = {} ret = dsrc.get_data() self.assertTrue(ret) netconfig = dsrc.network_config self.assertEqual(netconfig, fallback_config) mock_fallback.assert_called_with( blacklist_drivers=["mlx4_core", "mlx5_core"], config_driver=True ) @mock.patch(MOCKPATH + "net.get_interfaces", autospec=True) def test_blacklist_through_distro(self, m_net_get_interfaces): """Verify Azure DS updates blacklist drivers in the distro's networking object.""" odata = {"HostName": "myhost", "UserName": "myuser"} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": {}, } distro_cls = distros.fetch("ubuntu") distro = distro_cls("ubuntu", {}, self.paths) dsrc = self._get_ds(data, distro=distro) dsrc.get_data() self.assertEqual( distro.networking.blacklist_drivers, dsaz.BLACKLIST_DRIVERS ) distro.networking.get_interfaces_by_mac() m_net_get_interfaces.assert_called_with( blacklist_drivers=dsaz.BLACKLIST_DRIVERS ) @mock.patch( "cloudinit.sources.helpers.azure.OpenSSLManager.parse_certificates" ) def test_get_public_ssh_keys_with_imds(self, m_parse_certificates): sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {"HostName": "myhost", "UserName": "myuser"} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": sys_cfg, } dsrc = self._get_ds(data) dsrc.get_data() dsrc.setup(True) ssh_keys = dsrc.get_public_ssh_keys() self.assertEqual(ssh_keys, ["ssh-rsa key1"]) self.assertEqual(m_parse_certificates.call_count, 0) def test_key_without_crlf_valid(self): test_key = "ssh-rsa somerandomkeystuff some comment" assert True is dsaz._key_is_openssh_formatted(test_key) def test_key_with_crlf_invalid(self): test_key = "ssh-rsa someran\r\ndomkeystuff some comment" assert False is dsaz._key_is_openssh_formatted(test_key) def test_key_endswith_crlf_valid(self): test_key = "ssh-rsa somerandomkeystuff some comment\r\n" assert True is dsaz._key_is_openssh_formatted(test_key) @mock.patch( "cloudinit.sources.helpers.azure.OpenSSLManager.parse_certificates" ) @mock.patch(MOCKPATH + "get_metadata_from_imds") def test_get_public_ssh_keys_with_no_openssh_format( self, m_get_metadata_from_imds, m_parse_certificates ): imds_data = copy.deepcopy(NETWORK_METADATA) imds_data["compute"]["publicKeys"][0]["keyData"] = "no-openssh-format" m_get_metadata_from_imds.return_value = imds_data sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {"HostName": "myhost", "UserName": "myuser"} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": sys_cfg, } dsrc = self._get_ds(data) dsrc.get_data() dsrc.setup(True) ssh_keys = dsrc.get_public_ssh_keys() self.assertEqual(ssh_keys, []) self.assertEqual(m_parse_certificates.call_count, 0) @mock.patch(MOCKPATH + "get_metadata_from_imds") def test_get_public_ssh_keys_without_imds(self, m_get_metadata_from_imds): m_get_metadata_from_imds.return_value = dict() sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {"HostName": "myhost", "UserName": "myuser"} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": sys_cfg, } dsrc = self._get_ds(data) dsaz.get_metadata_from_fabric.return_value = {"public-keys": ["key2"]} dsrc.get_data() dsrc.setup(True) ssh_keys = dsrc.get_public_ssh_keys() self.assertEqual(ssh_keys, ["key2"]) @mock.patch(MOCKPATH + "get_metadata_from_imds") def test_imds_api_version_wanted_nonexistent( self, m_get_metadata_from_imds ): def get_metadata_from_imds_side_eff(*args, **kwargs): if kwargs["api_version"] == dsaz.IMDS_VER_WANT: raise url_helper.UrlError("No IMDS version", code=400) return NETWORK_METADATA m_get_metadata_from_imds.side_effect = get_metadata_from_imds_side_eff sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {"HostName": "myhost", "UserName": "myuser"} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": sys_cfg, } dsrc = self._get_ds(data) dsrc.get_data() self.assertIsNotNone(dsrc.metadata) self.assertTrue(dsrc.failed_desired_api_version) @mock.patch( MOCKPATH + "get_metadata_from_imds", return_value=NETWORK_METADATA ) def test_imds_api_version_wanted_exists(self, m_get_metadata_from_imds): sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {"HostName": "myhost", "UserName": "myuser"} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": sys_cfg, } dsrc = self._get_ds(data) dsrc.get_data() self.assertIsNotNone(dsrc.metadata) self.assertFalse(dsrc.failed_desired_api_version) @mock.patch(MOCKPATH + "get_metadata_from_imds") def test_hostname_from_imds(self, m_get_metadata_from_imds): sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {"HostName": "myhost", "UserName": "myuser"} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": sys_cfg, } imds_data_with_os_profile = copy.deepcopy(NETWORK_METADATA) imds_data_with_os_profile["compute"]["osProfile"] = dict( adminUsername="username1", computerName="hostname1", disablePasswordAuthentication="true", ) m_get_metadata_from_imds.return_value = imds_data_with_os_profile dsrc = self._get_ds(data) dsrc.get_data() self.assertEqual(dsrc.metadata["local-hostname"], "hostname1") @mock.patch(MOCKPATH + "get_metadata_from_imds") def test_username_from_imds(self, m_get_metadata_from_imds): sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {"HostName": "myhost", "UserName": "myuser"} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": sys_cfg, } imds_data_with_os_profile = copy.deepcopy(NETWORK_METADATA) imds_data_with_os_profile["compute"]["osProfile"] = dict( adminUsername="username1", computerName="hostname1", disablePasswordAuthentication="true", ) m_get_metadata_from_imds.return_value = imds_data_with_os_profile dsrc = self._get_ds(data) dsrc.get_data() self.assertEqual( dsrc.cfg["system_info"]["default_user"]["name"], "username1" ) @mock.patch(MOCKPATH + "get_metadata_from_imds") def test_disable_password_from_imds(self, m_get_metadata_from_imds): sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {"HostName": "myhost", "UserName": "myuser"} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": sys_cfg, } imds_data_with_os_profile = copy.deepcopy(NETWORK_METADATA) imds_data_with_os_profile["compute"]["osProfile"] = dict( adminUsername="username1", computerName="hostname1", disablePasswordAuthentication="true", ) m_get_metadata_from_imds.return_value = imds_data_with_os_profile dsrc = self._get_ds(data) dsrc.get_data() self.assertTrue(dsrc.metadata["disable_password"]) @mock.patch(MOCKPATH + "get_metadata_from_imds") def test_userdata_from_imds(self, m_get_metadata_from_imds): sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} odata = {"HostName": "myhost", "UserName": "myuser"} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": sys_cfg, } userdata = "userdataImds" imds_data = copy.deepcopy(NETWORK_METADATA) imds_data["compute"]["osProfile"] = dict( adminUsername="username1", computerName="hostname1", disablePasswordAuthentication="true", ) imds_data["compute"]["userData"] = b64e(userdata) m_get_metadata_from_imds.return_value = imds_data dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) self.assertEqual(dsrc.userdata_raw, userdata.encode("utf-8")) @mock.patch(MOCKPATH + "get_metadata_from_imds") def test_userdata_from_imds_with_customdata_from_OVF( self, m_get_metadata_from_imds ): userdataOVF = "userdataOVF" odata = { "HostName": "myhost", "UserName": "myuser", "UserData": {"text": b64e(userdataOVF), "encoding": "base64"}, } sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}} data = { "ovfcontent": construct_valid_ovf_env(data=odata), "sys_cfg": sys_cfg, } userdataImds = "userdataImds" imds_data = copy.deepcopy(NETWORK_METADATA) imds_data["compute"]["osProfile"] = dict( adminUsername="username1", computerName="hostname1", disablePasswordAuthentication="true", ) imds_data["compute"]["userData"] = b64e(userdataImds) m_get_metadata_from_imds.return_value = imds_data dsrc = self._get_ds(data) ret = dsrc.get_data() self.assertTrue(ret) self.assertEqual(dsrc.userdata_raw, userdataOVF.encode("utf-8")) class TestLoadAzureDsDir(CiTestCase): """Tests for load_azure_ds_dir.""" def setUp(self): self.source_dir = self.tmp_dir() super(TestLoadAzureDsDir, self).setUp() def test_missing_ovf_env_xml_raises_non_azure_datasource_error(self): """load_azure_ds_dir raises an error When ovf-env.xml doesn't exit.""" with self.assertRaises(dsaz.NonAzureDataSource) as context_manager: dsaz.load_azure_ds_dir(self.source_dir) self.assertEqual( "No ovf-env file found", str(context_manager.exception) ) def test_wb_invalid_ovf_env_xml_calls_read_azure_ovf(self): """load_azure_ds_dir calls read_azure_ovf to parse the xml.""" ovf_path = os.path.join(self.source_dir, "ovf-env.xml") with open(ovf_path, "wb") as stream: stream.write(b"invalid xml") with self.assertRaises(dsaz.BrokenAzureDataSource) as context_manager: dsaz.load_azure_ds_dir(self.source_dir) self.assertEqual( "Invalid ovf-env.xml: syntax error: line 1, column 0", str(context_manager.exception), ) class TestReadAzureOvf(CiTestCase): def test_invalid_xml_raises_non_azure_ds(self): invalid_xml = "" + construct_valid_ovf_env(data={}) self.assertRaises( dsaz.BrokenAzureDataSource, dsaz.read_azure_ovf, invalid_xml ) def test_load_with_pubkeys(self): mypklist = [{"fingerprint": "fp1", "path": "path1", "value": ""}] pubkeys = [(x["fingerprint"], x["path"], x["value"]) for x in mypklist] content = construct_valid_ovf_env(pubkeys=pubkeys) (_md, _ud, cfg) = dsaz.read_azure_ovf(content) for mypk in mypklist: self.assertIn(mypk, cfg["_pubkeys"]) class TestCanDevBeReformatted(CiTestCase): warning_file = "dataloss_warning_readme.txt" def _domock(self, mockpath, sattr=None): patcher = mock.patch(mockpath) setattr(self, sattr, patcher.start()) self.addCleanup(patcher.stop) def patchup(self, devs): bypath = {} for path, data in devs.items(): bypath[path] = data if "realpath" in data: bypath[data["realpath"]] = data for ppath, pdata in data.get("partitions", {}).items(): bypath[ppath] = pdata if "realpath" in data: bypath[pdata["realpath"]] = pdata def realpath(d): return bypath[d].get("realpath", d) def partitions_on_device(devpath): parts = bypath.get(devpath, {}).get("partitions", {}) ret = [] for path, data in parts.items(): ret.append((data.get("num"), realpath(path))) # return sorted by partition number return sorted(ret, key=lambda d: d[0]) def mount_cb(device, callback, mtype, update_env_for_mount): self.assertEqual("ntfs", mtype) self.assertEqual("C", update_env_for_mount.get("LANG")) p = self.tmp_dir() for f in bypath.get(device).get("files", []): write_file(os.path.join(p, f), content=f) return callback(p) def has_ntfs_fs(device): return bypath.get(device, {}).get("fs") == "ntfs" p = MOCKPATH self._domock(p + "_partitions_on_device", "m_partitions_on_device") self._domock(p + "_has_ntfs_filesystem", "m_has_ntfs_filesystem") self._domock(p + "util.mount_cb", "m_mount_cb") self._domock(p + "os.path.realpath", "m_realpath") self._domock(p + "os.path.exists", "m_exists") self._domock(p + "util.SeLinuxGuard", "m_selguard") self.m_exists.side_effect = lambda p: p in bypath self.m_realpath.side_effect = realpath self.m_has_ntfs_filesystem.side_effect = has_ntfs_fs self.m_mount_cb.side_effect = mount_cb self.m_partitions_on_device.side_effect = partitions_on_device self.m_selguard.__enter__ = mock.Mock(return_value=False) self.m_selguard.__exit__ = mock.Mock() def test_three_partitions_is_false(self): """A disk with 3 partitions can not be formatted.""" self.patchup( { "/dev/sda": { "partitions": { "/dev/sda1": {"num": 1}, "/dev/sda2": {"num": 2}, "/dev/sda3": {"num": 3}, } } } ) value, msg = dsaz.can_dev_be_reformatted( "/dev/sda", preserve_ntfs=False ) self.assertFalse(value) self.assertIn("3 or more", msg.lower()) def test_no_partitions_is_false(self): """A disk with no partitions can not be formatted.""" self.patchup({"/dev/sda": {}}) value, msg = dsaz.can_dev_be_reformatted( "/dev/sda", preserve_ntfs=False ) self.assertFalse(value) self.assertIn("not partitioned", msg.lower()) def test_two_partitions_not_ntfs_false(self): """2 partitions and 2nd not ntfs can not be formatted.""" self.patchup( { "/dev/sda": { "partitions": { "/dev/sda1": {"num": 1}, "/dev/sda2": {"num": 2, "fs": "ext4", "files": []}, } } } ) value, msg = dsaz.can_dev_be_reformatted( "/dev/sda", preserve_ntfs=False ) self.assertFalse(value) self.assertIn("not ntfs", msg.lower()) def test_two_partitions_ntfs_populated_false(self): """2 partitions and populated ntfs fs on 2nd can not be formatted.""" self.patchup( { "/dev/sda": { "partitions": { "/dev/sda1": {"num": 1}, "/dev/sda2": { "num": 2, "fs": "ntfs", "files": ["secret.txt"], }, } } } ) value, msg = dsaz.can_dev_be_reformatted( "/dev/sda", preserve_ntfs=False ) self.assertFalse(value) self.assertIn("files on it", msg.lower()) def test_two_partitions_ntfs_empty_is_true(self): """2 partitions and empty ntfs fs on 2nd can be formatted.""" self.patchup( { "/dev/sda": { "partitions": { "/dev/sda1": {"num": 1}, "/dev/sda2": {"num": 2, "fs": "ntfs", "files": []}, } } } ) value, msg = dsaz.can_dev_be_reformatted( "/dev/sda", preserve_ntfs=False ) self.assertTrue(value) self.assertIn("safe for", msg.lower()) def test_one_partition_not_ntfs_false(self): """1 partition witih fs other than ntfs can not be formatted.""" self.patchup( { "/dev/sda": { "partitions": { "/dev/sda1": {"num": 1, "fs": "zfs"}, } } } ) value, msg = dsaz.can_dev_be_reformatted( "/dev/sda", preserve_ntfs=False ) self.assertFalse(value) self.assertIn("not ntfs", msg.lower()) def test_one_partition_ntfs_populated_false(self): """1 mountable ntfs partition with many files can not be formatted.""" self.patchup( { "/dev/sda": { "partitions": { "/dev/sda1": { "num": 1, "fs": "ntfs", "files": ["file1.txt", "file2.exe"], }, } } } ) with mock.patch.object(dsaz.LOG, "warning") as warning: value, msg = dsaz.can_dev_be_reformatted( "/dev/sda", preserve_ntfs=False ) wmsg = warning.call_args[0][0] self.assertIn( "looks like you're using NTFS on the ephemeral disk", wmsg ) self.assertFalse(value) self.assertIn("files on it", msg.lower()) def test_one_partition_ntfs_empty_is_true(self): """1 mountable ntfs partition and no files can be formatted.""" self.patchup( { "/dev/sda": { "partitions": { "/dev/sda1": {"num": 1, "fs": "ntfs", "files": []} } } } ) value, msg = dsaz.can_dev_be_reformatted( "/dev/sda", preserve_ntfs=False ) self.assertTrue(value) self.assertIn("safe for", msg.lower()) def test_one_partition_ntfs_empty_with_dataloss_file_is_true(self): """1 mountable ntfs partition and only warn file can be formatted.""" self.patchup( { "/dev/sda": { "partitions": { "/dev/sda1": { "num": 1, "fs": "ntfs", "files": ["dataloss_warning_readme.txt"], } } } } ) value, msg = dsaz.can_dev_be_reformatted( "/dev/sda", preserve_ntfs=False ) self.assertTrue(value) self.assertIn("safe for", msg.lower()) def test_one_partition_through_realpath_is_true(self): """A symlink to a device with 1 ntfs partition can be formatted.""" epath = "/dev/disk/cloud/azure_resource" self.patchup( { epath: { "realpath": "/dev/sdb", "partitions": { epath + "-part1": { "num": 1, "fs": "ntfs", "files": [self.warning_file], "realpath": "/dev/sdb1", } }, } } ) value, msg = dsaz.can_dev_be_reformatted(epath, preserve_ntfs=False) self.assertTrue(value) self.assertIn("safe for", msg.lower()) def test_three_partition_through_realpath_is_false(self): """A symlink to a device with 3 partitions can not be formatted.""" epath = "/dev/disk/cloud/azure_resource" self.patchup( { epath: { "realpath": "/dev/sdb", "partitions": { epath + "-part1": { "num": 1, "fs": "ntfs", "files": [self.warning_file], "realpath": "/dev/sdb1", }, epath + "-part2": { "num": 2, "fs": "ext3", "realpath": "/dev/sdb2", }, epath + "-part3": { "num": 3, "fs": "ext", "realpath": "/dev/sdb3", }, }, } } ) value, msg = dsaz.can_dev_be_reformatted(epath, preserve_ntfs=False) self.assertFalse(value) self.assertIn("3 or more", msg.lower()) def test_ntfs_mount_errors_true(self): """can_dev_be_reformatted does not fail if NTFS is unknown fstype.""" self.patchup( { "/dev/sda": { "partitions": { "/dev/sda1": {"num": 1, "fs": "ntfs", "files": []} } } } ) error_msgs = [ "Stderr: mount: unknown filesystem type 'ntfs'", # RHEL "Stderr: mount: /dev/sdb1: unknown filesystem type 'ntfs'", # SLES ] for err_msg in error_msgs: self.m_mount_cb.side_effect = MountFailedError( "Failed mounting %s to %s due to: \nUnexpected.\n%s" % ("/dev/sda", "/fake-tmp/dir", err_msg) ) value, msg = dsaz.can_dev_be_reformatted( "/dev/sda", preserve_ntfs=False ) self.assertTrue(value) self.assertIn("cannot mount NTFS, assuming", msg) def test_never_destroy_ntfs_config_false(self): """Normally formattable situation with never_destroy_ntfs set.""" self.patchup( { "/dev/sda": { "partitions": { "/dev/sda1": { "num": 1, "fs": "ntfs", "files": ["dataloss_warning_readme.txt"], } } } } ) value, msg = dsaz.can_dev_be_reformatted( "/dev/sda", preserve_ntfs=True ) self.assertFalse(value) self.assertIn( "config says to never destroy NTFS " "(datasource.Azure.never_destroy_ntfs)", msg, ) class TestClearCachedData(CiTestCase): def test_clear_cached_attrs_clears_imds(self): """All class attributes are reset to defaults, including imds data.""" tmp = self.tmp_dir() paths = helpers.Paths({"cloud_dir": tmp, "run_dir": tmp}) dsrc = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=paths) clean_values = [dsrc.metadata, dsrc.userdata, dsrc._metadata_imds] dsrc.metadata = "md" dsrc.userdata = "ud" dsrc._metadata_imds = "imds" dsrc._dirty_cache = True dsrc.clear_cached_attrs() self.assertEqual( [dsrc.metadata, dsrc.userdata, dsrc._metadata_imds], clean_values ) class TestAzureNetExists(CiTestCase): def test_azure_net_must_exist_for_legacy_objpkl(self): """DataSourceAzureNet must exist for old obj.pkl files that reference it.""" self.assertTrue(hasattr(dsaz, "DataSourceAzureNet")) class TestPreprovisioningReadAzureOvfFlag(CiTestCase): def test_read_azure_ovf_with_true_flag(self): """The read_azure_ovf method should set the PreprovisionedVM cfg flag if the proper setting is present.""" content = construct_valid_ovf_env( platform_settings={"PreprovisionedVm": "True"} ) ret = dsaz.read_azure_ovf(content) cfg = ret[2] self.assertTrue(cfg["PreprovisionedVm"]) def test_read_azure_ovf_with_false_flag(self): """The read_azure_ovf method should set the PreprovisionedVM cfg flag to false if the proper setting is false.""" content = construct_valid_ovf_env( platform_settings={"PreprovisionedVm": "False"} ) ret = dsaz.read_azure_ovf(content) cfg = ret[2] self.assertFalse(cfg["PreprovisionedVm"]) def test_read_azure_ovf_without_flag(self): """The read_azure_ovf method should not set the PreprovisionedVM cfg flag.""" content = construct_valid_ovf_env() ret = dsaz.read_azure_ovf(content) cfg = ret[2] self.assertFalse(cfg["PreprovisionedVm"]) self.assertEqual(None, cfg["PreprovisionedVMType"]) def test_read_azure_ovf_with_running_type(self): """The read_azure_ovf method should set PreprovisionedVMType cfg flag to Running.""" content = construct_valid_ovf_env( platform_settings={ "PreprovisionedVMType": "Running", "PreprovisionedVm": "True", } ) ret = dsaz.read_azure_ovf(content) cfg = ret[2] self.assertTrue(cfg["PreprovisionedVm"]) self.assertEqual("Running", cfg["PreprovisionedVMType"]) def test_read_azure_ovf_with_savable_type(self): """The read_azure_ovf method should set PreprovisionedVMType cfg flag to Savable.""" content = construct_valid_ovf_env( platform_settings={ "PreprovisionedVMType": "Savable", "PreprovisionedVm": "True", } ) ret = dsaz.read_azure_ovf(content) cfg = ret[2] self.assertTrue(cfg["PreprovisionedVm"]) self.assertEqual("Savable", cfg["PreprovisionedVMType"]) @mock.patch("os.path.isfile") class TestPreprovisioningShouldReprovision(CiTestCase): def setUp(self): super(TestPreprovisioningShouldReprovision, self).setUp() tmp = self.tmp_dir() self.waagent_d = self.tmp_path("/var/lib/waagent", tmp) self.paths = helpers.Paths({"cloud_dir": tmp}) dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d @mock.patch(MOCKPATH + "util.write_file") def test__should_reprovision_with_true_cfg(self, isfile, write_f): """The _should_reprovision method should return true with config flag present.""" isfile.return_value = False dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) self.assertTrue( dsa._should_reprovision( (None, None, {"PreprovisionedVm": True}, None) ) ) def test__should_reprovision_with_file_existing(self, isfile): """The _should_reprovision method should return True if the sentinal exists.""" isfile.return_value = True dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) self.assertTrue( dsa._should_reprovision( (None, None, {"preprovisionedvm": False}, None) ) ) def test__should_reprovision_returns_false(self, isfile): """The _should_reprovision method should return False if config and sentinal are not present.""" isfile.return_value = False dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) self.assertFalse(dsa._should_reprovision((None, None, {}, None))) @mock.patch(MOCKPATH + "util.write_file", autospec=True) def test__should_reprovision_uses_imds_md(self, write_file, isfile): """The _should_reprovision method should be able to retrieve the preprovisioning VM type from imds metadata""" isfile.return_value = False dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) self.assertTrue( dsa._should_reprovision( (None, None, {}, None), {"extended": {"compute": {"ppsType": "Running"}}}, ) ) self.assertFalse(dsa._should_reprovision((None, None, {}, None), {})) self.assertFalse( dsa._should_reprovision( (None, None, {}, None), {"extended": {"compute": {"hasCustomData": False}}}, ) ) @mock.patch(MOCKPATH + "DataSourceAzure._poll_imds") def test_reprovision_calls__poll_imds(self, _poll_imds, isfile): """_reprovision will poll IMDS.""" isfile.return_value = False hostname = "myhost" username = "myuser" odata = {"HostName": hostname, "UserName": username} _poll_imds.return_value = construct_valid_ovf_env(data=odata) dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) dsa._reprovision() _poll_imds.assert_called_with() class TestPreprovisioningHotAttachNics(CiTestCase): def setUp(self): super(TestPreprovisioningHotAttachNics, self).setUp() self.tmp = self.tmp_dir() self.waagent_d = self.tmp_path("/var/lib/waagent", self.tmp) self.paths = helpers.Paths({"cloud_dir": self.tmp}) dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d self.paths = helpers.Paths({"cloud_dir": self.tmp}) @mock.patch( "cloudinit.sources.helpers.netlink.wait_for_nic_detach_event", autospec=True, ) @mock.patch(MOCKPATH + "util.write_file", autospec=True) def test_nic_detach_writes_marker(self, m_writefile, m_detach): """When we detect that a nic gets detached, we write a marker for it""" dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) nl_sock = mock.MagicMock() dsa._wait_for_nic_detach(nl_sock) m_detach.assert_called_with(nl_sock) self.assertEqual(1, m_detach.call_count) m_writefile.assert_called_with( dsaz.REPROVISION_NIC_DETACHED_MARKER_FILE, mock.ANY ) @mock.patch(MOCKPATH + "util.write_file", autospec=True) @mock.patch(MOCKPATH + "DataSourceAzure.fallback_interface") @mock.patch(MOCKPATH + "EphemeralDHCPv4WithReporting") @mock.patch(MOCKPATH + "DataSourceAzure._report_ready") @mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach") def test_detect_nic_attach_reports_ready_and_waits_for_detach( self, m_detach, m_report_ready, m_dhcp, m_fallback_if, m_writefile ): """Report ready first and then wait for nic detach""" dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) dsa._wait_for_all_nics_ready() m_fallback_if.return_value = "Dummy interface" self.assertEqual(1, m_report_ready.call_count) self.assertEqual(1, m_detach.call_count) self.assertEqual(1, m_writefile.call_count) self.assertEqual(1, m_dhcp.call_count) m_writefile.assert_called_with( dsaz.REPORTED_READY_MARKER_FILE, mock.ANY ) @mock.patch("os.path.isfile") @mock.patch(MOCKPATH + "DataSourceAzure.fallback_interface") @mock.patch(MOCKPATH + "EphemeralDHCPv4WithReporting") @mock.patch(MOCKPATH + "DataSourceAzure._report_ready") @mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach") def test_detect_nic_attach_skips_report_ready_when_marker_present( self, m_detach, m_report_ready, m_dhcp, m_fallback_if, m_isfile ): """Skip reporting ready if we already have a marker file.""" dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) def isfile(key): return key == dsaz.REPORTED_READY_MARKER_FILE m_isfile.side_effect = isfile dsa._wait_for_all_nics_ready() m_fallback_if.return_value = "Dummy interface" self.assertEqual(0, m_report_ready.call_count) self.assertEqual(0, m_dhcp.call_count) self.assertEqual(1, m_detach.call_count) @mock.patch("os.path.isfile") @mock.patch(MOCKPATH + "DataSourceAzure.fallback_interface") @mock.patch(MOCKPATH + "EphemeralDHCPv4WithReporting") @mock.patch(MOCKPATH + "DataSourceAzure._report_ready") @mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach") def test_detect_nic_attach_skips_nic_detach_when_marker_present( self, m_detach, m_report_ready, m_dhcp, m_fallback_if, m_isfile ): """Skip wait for nic detach if it already happened.""" dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) m_isfile.return_value = True dsa._wait_for_all_nics_ready() m_fallback_if.return_value = "Dummy interface" self.assertEqual(0, m_report_ready.call_count) self.assertEqual(0, m_dhcp.call_count) self.assertEqual(0, m_detach.call_count) @mock.patch(MOCKPATH + "DataSourceAzure.wait_for_link_up", autospec=True) @mock.patch("cloudinit.sources.helpers.netlink.wait_for_nic_attach_event") @mock.patch("cloudinit.sources.net.find_fallback_nic") @mock.patch(MOCKPATH + "get_metadata_from_imds") @mock.patch(MOCKPATH + "EphemeralDHCPv4") @mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach") @mock.patch("os.path.isfile") def test_wait_for_nic_attach_if_no_fallback_interface( self, m_isfile, m_detach, m_dhcpv4, m_imds, m_fallback_if, m_attach, m_link_up, ): """Wait for nic attach if we do not have a fallback interface""" dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) lease = { "interface": "eth9", "fixed-address": "192.168.2.9", "routers": "192.168.2.1", "subnet-mask": "255.255.255.0", "unknown-245": "624c3620", } m_isfile.return_value = True m_attach.return_value = "eth0" dhcp_ctx = mock.MagicMock(lease=lease) dhcp_ctx.obtain_lease.return_value = lease m_dhcpv4.return_value = dhcp_ctx m_imds.return_value = IMDS_NETWORK_METADATA m_fallback_if.return_value = None dsa._wait_for_all_nics_ready() self.assertEqual(0, m_detach.call_count) self.assertEqual(1, m_attach.call_count) self.assertEqual(1, m_dhcpv4.call_count) self.assertEqual(1, m_imds.call_count) self.assertEqual(1, m_link_up.call_count) m_link_up.assert_called_with(mock.ANY, "eth0") @mock.patch(MOCKPATH + "DataSourceAzure.wait_for_link_up") @mock.patch("cloudinit.sources.helpers.netlink.wait_for_nic_attach_event") @mock.patch("cloudinit.sources.net.find_fallback_nic") @mock.patch(MOCKPATH + "DataSourceAzure.get_imds_data_with_api_fallback") @mock.patch(MOCKPATH + "EphemeralDHCPv4") @mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach") @mock.patch("os.path.isfile") def test_wait_for_nic_attach_multinic_attach( self, m_isfile, m_detach, m_dhcpv4, m_imds, m_fallback_if, m_attach, m_link_up, ): """Wait for nic attach if we do not have a fallback interface""" dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) lease = { "interface": "eth9", "fixed-address": "192.168.2.9", "routers": "192.168.2.1", "subnet-mask": "255.255.255.0", "unknown-245": "624c3620", } m_attach_call_count = 0 def nic_attach_ret(nl_sock, nics_found): nonlocal m_attach_call_count m_attach_call_count = m_attach_call_count + 1 if m_attach_call_count == 1: return "eth0" elif m_attach_call_count == 2: return "eth1" raise RuntimeError("Must have found primary nic by now.") # Simulate two NICs by adding the same one twice. md = { "interface": [ IMDS_NETWORK_METADATA["interface"][0], IMDS_NETWORK_METADATA["interface"][0], ] } def network_metadata_ret(ifname, retries, type, exc_cb, infinite): if ifname == "eth0": return md raise requests.Timeout("Fake connection timeout") m_isfile.return_value = True m_attach.side_effect = nic_attach_ret dhcp_ctx = mock.MagicMock(lease=lease) dhcp_ctx.obtain_lease.return_value = lease m_dhcpv4.return_value = dhcp_ctx m_imds.side_effect = network_metadata_ret m_fallback_if.return_value = None dsa._wait_for_all_nics_ready() self.assertEqual(0, m_detach.call_count) self.assertEqual(2, m_attach.call_count) # DHCP and network metadata calls will only happen on the primary NIC. self.assertEqual(1, m_dhcpv4.call_count) self.assertEqual(1, m_imds.call_count) self.assertEqual(2, m_link_up.call_count) @mock.patch(MOCKPATH + "DataSourceAzure.get_imds_data_with_api_fallback") @mock.patch(MOCKPATH + "EphemeralDHCPv4") def test_check_if_nic_is_primary_retries_on_failures( self, m_dhcpv4, m_imds ): """Retry polling for network metadata on all failures except timeout and network unreachable errors""" dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) lease = { "interface": "eth9", "fixed-address": "192.168.2.9", "routers": "192.168.2.1", "subnet-mask": "255.255.255.0", "unknown-245": "624c3620", } eth0Retries = [] eth1Retries = [] # Simulate two NICs by adding the same one twice. md = { "interface": [ IMDS_NETWORK_METADATA["interface"][0], IMDS_NETWORK_METADATA["interface"][0], ] } def network_metadata_ret(ifname, retries, type, exc_cb, infinite): nonlocal eth0Retries, eth1Retries # Simulate readurl functionality with retries and # exception callbacks so that the callback logic can be # validated. if ifname == "eth0": cause = requests.HTTPError() for _ in range(0, 15): error = url_helper.UrlError(cause=cause, code=410) eth0Retries.append(exc_cb("No goal state.", error)) else: for _ in range(0, 10): # We are expected to retry for a certain period for both # timeout errors and network unreachable errors. if _ < 5: cause = requests.Timeout("Fake connection timeout") else: cause = requests.ConnectionError("Network Unreachable") error = url_helper.UrlError(cause=cause) eth1Retries.append(exc_cb("Connection timeout", error)) # Should stop retrying after 10 retries eth1Retries.append(exc_cb("Connection timeout", error)) raise cause return md m_imds.side_effect = network_metadata_ret dhcp_ctx = mock.MagicMock(lease=lease) dhcp_ctx.obtain_lease.return_value = lease m_dhcpv4.return_value = dhcp_ctx is_primary, expected_nic_count = dsa._check_if_nic_is_primary("eth0") self.assertEqual(True, is_primary) self.assertEqual(2, expected_nic_count) # All Eth0 errors are non-timeout errors. So we should have been # retrying indefinitely until success. for i in eth0Retries: self.assertTrue(i) is_primary, expected_nic_count = dsa._check_if_nic_is_primary("eth1") self.assertEqual(False, is_primary) # All Eth1 errors are timeout errors. Retry happens for a max of 10 and # then we should have moved on assuming it is not the primary nic. for i in range(0, 10): self.assertTrue(eth1Retries[i]) self.assertFalse(eth1Retries[10]) @mock.patch("cloudinit.distros.networking.LinuxNetworking.try_set_link_up") def test_wait_for_link_up_returns_if_already_up(self, m_is_link_up): """Waiting for link to be up should return immediately if the link is already up.""" distro_cls = distros.fetch("ubuntu") distro = distro_cls("ubuntu", {}, self.paths) dsa = dsaz.DataSourceAzure({}, distro=distro, paths=self.paths) m_is_link_up.return_value = True dsa.wait_for_link_up("eth0") self.assertEqual(1, m_is_link_up.call_count) @mock.patch(MOCKPATH + "net.is_up", autospec=True) @mock.patch(MOCKPATH + "util.write_file") @mock.patch("cloudinit.net.read_sys_net") @mock.patch("cloudinit.distros.networking.LinuxNetworking.try_set_link_up") def test_wait_for_link_up_checks_link_after_sleep( self, m_try_set_link_up, m_read_sys_net, m_writefile, m_is_up ): """Waiting for link to be up should return immediately if the link is already up.""" distro_cls = distros.fetch("ubuntu") distro = distro_cls("ubuntu", {}, self.paths) dsa = dsaz.DataSourceAzure({}, distro=distro, paths=self.paths) m_try_set_link_up.return_value = False callcount = 0 def is_up_mock(key): nonlocal callcount if callcount == 0: callcount += 1 return False return True m_is_up.side_effect = is_up_mock with mock.patch("cloudinit.sources.DataSourceAzure.sleep"): dsa.wait_for_link_up("eth0") self.assertEqual(2, m_try_set_link_up.call_count) self.assertEqual(2, m_is_up.call_count) @mock.patch(MOCKPATH + "util.write_file") @mock.patch("cloudinit.net.read_sys_net") @mock.patch("cloudinit.distros.networking.LinuxNetworking.try_set_link_up") def test_wait_for_link_up_writes_to_device_file( self, m_is_link_up, m_read_sys_net, m_writefile ): """Waiting for link to be up should return immediately if the link is already up.""" distro_cls = distros.fetch("ubuntu") distro = distro_cls("ubuntu", {}, self.paths) dsa = dsaz.DataSourceAzure({}, distro=distro, paths=self.paths) callcount = 0 def linkup(key): nonlocal callcount if callcount == 0: callcount += 1 return False return True m_is_link_up.side_effect = linkup dsa.wait_for_link_up("eth0") self.assertEqual(2, m_is_link_up.call_count) self.assertEqual(1, m_read_sys_net.call_count) self.assertEqual(2, m_writefile.call_count) @mock.patch( "cloudinit.sources.helpers.netlink.create_bound_netlink_socket" ) def test_wait_for_all_nics_ready_raises_if_socket_fails(self, m_socket): """Waiting for all nics should raise exception if netlink socket creation fails.""" m_socket.side_effect = netlink.NetlinkCreateSocketError distro_cls = distros.fetch("ubuntu") distro = distro_cls("ubuntu", {}, self.paths) dsa = dsaz.DataSourceAzure({}, distro=distro, paths=self.paths) self.assertRaises( netlink.NetlinkCreateSocketError, dsa._wait_for_all_nics_ready ) # dsa._wait_for_all_nics_ready() @mock.patch("cloudinit.net.dhcp.EphemeralIPv4Network") @mock.patch("cloudinit.net.dhcp.maybe_perform_dhcp_discovery") @mock.patch( "cloudinit.sources.helpers.netlink.wait_for_media_disconnect_connect" ) @mock.patch("requests.Session.request") @mock.patch(MOCKPATH + "DataSourceAzure._report_ready") class TestPreprovisioningPollIMDS(CiTestCase): def setUp(self): super(TestPreprovisioningPollIMDS, self).setUp() self.tmp = self.tmp_dir() self.waagent_d = self.tmp_path("/var/lib/waagent", self.tmp) self.paths = helpers.Paths({"cloud_dir": self.tmp}) dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d @mock.patch("time.sleep", mock.MagicMock()) @mock.patch(MOCKPATH + "EphemeralDHCPv4") def test_poll_imds_re_dhcp_on_timeout( self, m_dhcpv4, m_report_ready, m_request, m_media_switch, m_dhcp, m_net, ): """The poll_imds will retry DHCP on IMDS timeout.""" report_file = self.tmp_path("report_marker", self.tmp) lease = { "interface": "eth9", "fixed-address": "192.168.2.9", "routers": "192.168.2.1", "subnet-mask": "255.255.255.0", "unknown-245": "624c3620", } m_dhcp.return_value = [lease] m_media_switch.return_value = None dhcp_ctx = mock.MagicMock(lease=lease) dhcp_ctx.obtain_lease.return_value = lease m_dhcpv4.return_value = dhcp_ctx self.tries = 0 def fake_timeout_once(**kwargs): self.tries += 1 if self.tries == 1: raise requests.Timeout("Fake connection timeout") elif self.tries in (2, 3): response = requests.Response() response.status_code = 404 if self.tries == 2 else 410 raise requests.exceptions.HTTPError( "fake {}".format(response.status_code), response=response ) # Third try should succeed and stop retries or redhcp return mock.MagicMock(status_code=200, text="good", content="good") m_request.side_effect = fake_timeout_once dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) with mock.patch(MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file): dsa._poll_imds() self.assertEqual(m_report_ready.call_count, 1) m_report_ready.assert_called_with(lease=lease) self.assertEqual(3, m_dhcpv4.call_count, "Expected 3 DHCP calls") self.assertEqual(4, self.tries, "Expected 4 total reads from IMDS") @mock.patch("os.path.isfile") def test_poll_imds_skips_dhcp_if_ctx_present( self, m_isfile, report_ready_func, fake_resp, m_media_switch, m_dhcp, m_net, ): """The poll_imds function should reuse the dhcp ctx if it is already present. This happens when we wait for nic to be hot-attached before polling for reprovisiondata. Note that if this ctx is set when _poll_imds is called, then it is not expected to be waiting for media_disconnect_connect either.""" report_file = self.tmp_path("report_marker", self.tmp) m_isfile.return_value = True dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) dsa._ephemeral_dhcp_ctx = "Dummy dhcp ctx" with mock.patch(MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file): dsa._poll_imds() self.assertEqual(0, m_dhcp.call_count) self.assertEqual(0, m_media_switch.call_count) @mock.patch("os.path.isfile") @mock.patch(MOCKPATH + "EphemeralDHCPv4") def test_poll_imds_does_dhcp_on_retries_if_ctx_present( self, m_ephemeral_dhcpv4, m_isfile, report_ready_func, m_request, m_media_switch, m_dhcp, m_net, ): """The poll_imds function should reuse the dhcp ctx if it is already present. This happens when we wait for nic to be hot-attached before polling for reprovisiondata. Note that if this ctx is set when _poll_imds is called, then it is not expected to be waiting for media_disconnect_connect either.""" tries = 0 def fake_timeout_once(**kwargs): nonlocal tries tries += 1 if tries == 1: raise requests.Timeout("Fake connection timeout") return mock.MagicMock(status_code=200, text="good", content="good") m_request.side_effect = fake_timeout_once report_file = self.tmp_path("report_marker", self.tmp) m_isfile.return_value = True dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) with mock.patch( MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file ), mock.patch.object(dsa, "_ephemeral_dhcp_ctx") as m_dhcp_ctx: m_dhcp_ctx.obtain_lease.return_value = "Dummy lease" dsa._ephemeral_dhcp_ctx = m_dhcp_ctx dsa._poll_imds() self.assertEqual(1, m_dhcp_ctx.clean_network.call_count) self.assertEqual(1, m_ephemeral_dhcpv4.call_count) self.assertEqual(0, m_media_switch.call_count) self.assertEqual(2, m_request.call_count) def test_does_not_poll_imds_report_ready_when_marker_file_exists( self, m_report_ready, m_request, m_media_switch, m_dhcp, m_net ): """poll_imds should not call report ready when the reported ready marker file exists""" report_file = self.tmp_path("report_marker", self.tmp) write_file(report_file, content="dont run report_ready :)") m_dhcp.return_value = [ { "interface": "eth9", "fixed-address": "192.168.2.9", "routers": "192.168.2.1", "subnet-mask": "255.255.255.0", "unknown-245": "624c3620", } ] m_media_switch.return_value = None dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) with mock.patch(MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file): dsa._poll_imds() self.assertEqual(m_report_ready.call_count, 0) def test_poll_imds_report_ready_success_writes_marker_file( self, m_report_ready, m_request, m_media_switch, m_dhcp, m_net ): """poll_imds should write the report_ready marker file if reporting ready succeeds""" report_file = self.tmp_path("report_marker", self.tmp) m_dhcp.return_value = [ { "interface": "eth9", "fixed-address": "192.168.2.9", "routers": "192.168.2.1", "subnet-mask": "255.255.255.0", "unknown-245": "624c3620", } ] m_media_switch.return_value = None dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) self.assertFalse(os.path.exists(report_file)) with mock.patch(MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file): dsa._poll_imds() self.assertEqual(m_report_ready.call_count, 1) self.assertTrue(os.path.exists(report_file)) def test_poll_imds_report_ready_failure_raises_exc_and_doesnt_write_marker( self, m_report_ready, m_request, m_media_switch, m_dhcp, m_net ): """poll_imds should write the report_ready marker file if reporting ready succeeds""" report_file = self.tmp_path("report_marker", self.tmp) m_dhcp.return_value = [ { "interface": "eth9", "fixed-address": "192.168.2.9", "routers": "192.168.2.1", "subnet-mask": "255.255.255.0", "unknown-245": "624c3620", } ] m_media_switch.return_value = None m_report_ready.return_value = False dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths) self.assertFalse(os.path.exists(report_file)) with mock.patch(MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file): self.assertRaises(InvalidMetaDataException, dsa._poll_imds) self.assertEqual(m_report_ready.call_count, 1) self.assertFalse(os.path.exists(report_file)) @mock.patch(MOCKPATH + "DataSourceAzure._report_ready", mock.MagicMock()) @mock.patch(MOCKPATH + "subp.subp", mock.MagicMock()) @mock.patch(MOCKPATH + "util.write_file", mock.MagicMock()) @mock.patch( "cloudinit.sources.helpers.netlink.wait_for_media_disconnect_connect" ) @mock.patch("cloudinit.net.dhcp.EphemeralIPv4Network", autospec=True) @mock.patch("cloudinit.net.dhcp.maybe_perform_dhcp_discovery") @mock.patch("requests.Session.request") class TestAzureDataSourcePreprovisioning(CiTestCase): def setUp(self): super(TestAzureDataSourcePreprovisioning, self).setUp() tmp = self.tmp_dir() self.waagent_d = self.tmp_path("/var/lib/waagent", tmp) self.paths = helpers.Paths({"cloud_dir": tmp}) dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d def test_poll_imds_returns_ovf_env( self, m_request, m_dhcp, m_net, m_media_switch ): """The _poll_imds method should return the ovf_env.xml.""" m_media_switch.return_value = None m_dhcp.return_value = [ { "interface": "eth9", "fixed-address": "192.168.2.9", "routers": "192.168.2.1", "subnet-mask": "255.255.255.0", } ] url = "http://{0}/metadata/reprovisiondata?api-version=2019-06-01" host = "169.254.169.254" full_url = url.format(host) m_request.return_value = mock.MagicMock( status_code=200, text="ovf", content="ovf" ) dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) self.assertTrue(len(dsa._poll_imds()) > 0) self.assertEqual( m_request.call_args_list, [ mock.call( allow_redirects=True, headers={ "Metadata": "true", "User-Agent": "Cloud-Init/%s" % vs(), }, method="GET", timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS, url=full_url, ) ], ) self.assertEqual(m_dhcp.call_count, 2) m_net.assert_any_call( broadcast="192.168.2.255", interface="eth9", ip="192.168.2.9", prefix_or_mask="255.255.255.0", router="192.168.2.1", static_routes=None, ) self.assertEqual(m_net.call_count, 2) def test__reprovision_calls__poll_imds( self, m_request, m_dhcp, m_net, m_media_switch ): """The _reprovision method should call poll IMDS.""" m_media_switch.return_value = None m_dhcp.return_value = [ { "interface": "eth9", "fixed-address": "192.168.2.9", "routers": "192.168.2.1", "subnet-mask": "255.255.255.0", "unknown-245": "624c3620", } ] url = "http://{0}/metadata/reprovisiondata?api-version=2019-06-01" host = "169.254.169.254" full_url = url.format(host) hostname = "myhost" username = "myuser" odata = {"HostName": hostname, "UserName": username} content = construct_valid_ovf_env(data=odata) m_request.return_value = mock.MagicMock( status_code=200, text=content, content=content ) dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths) md, _ud, cfg, _d = dsa._reprovision() self.assertEqual(md["local-hostname"], hostname) self.assertEqual(cfg["system_info"]["default_user"]["name"], username) self.assertIn( mock.call( allow_redirects=True, headers={ "Metadata": "true", "User-Agent": "Cloud-Init/%s" % vs(), }, method="GET", timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS, url=full_url, ), m_request.call_args_list, ) self.assertEqual(m_dhcp.call_count, 2) m_net.assert_any_call( broadcast="192.168.2.255", interface="eth9", ip="192.168.2.9", prefix_or_mask="255.255.255.0", router="192.168.2.1", static_routes=None, ) self.assertEqual(m_net.call_count, 2) class TestRemoveUbuntuNetworkConfigScripts(CiTestCase): with_logs = True def setUp(self): super(TestRemoveUbuntuNetworkConfigScripts, self).setUp() self.tmp = self.tmp_dir() def test_remove_network_scripts_removes_both_files_and_directories(self): """Any files or directories in paths are removed when present.""" file1 = self.tmp_path("file1", dir=self.tmp) subdir = self.tmp_path("sub1", dir=self.tmp) subfile = self.tmp_path("leaf1", dir=subdir) write_file(file1, "file1content") write_file(subfile, "leafcontent") dsaz.maybe_remove_ubuntu_network_config_scripts(paths=[subdir, file1]) for path in (file1, subdir, subfile): self.assertFalse( os.path.exists(path), "Found unremoved: %s" % path ) expected_logs = [ "INFO: Removing Ubuntu extended network scripts because cloud-init" " updates Azure network configuration on the following events:" " ['boot', 'boot-legacy']", "Recursively deleting %s" % subdir, "Attempting to remove %s" % file1, ] for log in expected_logs: self.assertIn(log, self.logs.getvalue()) def test_remove_network_scripts_only_attempts_removal_if_path_exists(self): """Any files or directories absent are skipped without error.""" dsaz.maybe_remove_ubuntu_network_config_scripts( paths=[ self.tmp_path("nodirhere/", dir=self.tmp), self.tmp_path("notfilehere", dir=self.tmp), ] ) self.assertNotIn("/not/a", self.logs.getvalue()) # No delete logs @mock.patch(MOCKPATH + "os.path.exists") def test_remove_network_scripts_default_removes_stock_scripts( self, m_exists ): """Azure's stock ubuntu image scripts and artifacts are removed.""" # Report path absent on all to avoid delete operation m_exists.return_value = False dsaz.maybe_remove_ubuntu_network_config_scripts() calls = m_exists.call_args_list for path in dsaz.UBUNTU_EXTENDED_NETWORK_SCRIPTS: self.assertIn(mock.call(path), calls) class TestWBIsPlatformViable(CiTestCase): """White box tests for _is_platform_viable.""" with_logs = True @mock.patch(MOCKPATH + "dmi.read_dmi_data") def test_true_on_non_azure_chassis(self, m_read_dmi_data): """Return True if DMI chassis-asset-tag is AZURE_CHASSIS_ASSET_TAG.""" m_read_dmi_data.return_value = dsaz.AZURE_CHASSIS_ASSET_TAG self.assertTrue(dsaz._is_platform_viable("doesnotmatter")) @mock.patch(MOCKPATH + "os.path.exists") @mock.patch(MOCKPATH + "dmi.read_dmi_data") def test_true_on_azure_ovf_env_in_seed_dir(self, m_read_dmi_data, m_exist): """Return True if ovf-env.xml exists in known seed dirs.""" # Non-matching Azure chassis-asset-tag m_read_dmi_data.return_value = dsaz.AZURE_CHASSIS_ASSET_TAG + "X" m_exist.return_value = True self.assertTrue(dsaz._is_platform_viable("/some/seed/dir")) m_exist.called_once_with("/other/seed/dir") def test_false_on_no_matching_azure_criteria(self): """Report non-azure on unmatched asset tag, ovf-env absent and no dev. Return False when the asset tag doesn't match Azure's static AZURE_CHASSIS_ASSET_TAG, no ovf-env.xml files exist in known seed dirs and no devices have a label starting with prefix 'rd_rdfe_'. """ self.assertFalse( wrap_and_call( MOCKPATH, { "os.path.exists": False, # Non-matching Azure chassis-asset-tag "dmi.read_dmi_data": dsaz.AZURE_CHASSIS_ASSET_TAG + "X", "subp.which": None, }, dsaz._is_platform_viable, "doesnotmatter", ) ) self.assertIn( "DEBUG: Non-Azure DMI asset tag '{0}' discovered.\n".format( dsaz.AZURE_CHASSIS_ASSET_TAG + "X" ), self.logs.getvalue(), ) class TestRandomSeed(CiTestCase): """Test proper handling of random_seed""" def test_non_ascii_seed_is_serializable(self): """Pass if a random string from the Azure infrastructure which contains at least one non-Unicode character can be converted to/from JSON without alteration and without throwing an exception. """ path = resourceLocation("azure/non_unicode_random_string") result = dsaz._get_random_seed(path) obj = {"seed": result} try: serialized = json_dumps(obj) deserialized = load_json(serialized) except UnicodeDecodeError: self.fail("Non-serializable random seed returned") self.assertEqual(deserialized["seed"], result) # vi: ts=4 expandtab