summaryrefslogtreecommitdiff
path: root/tests/unittests/sources/test_azure.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/sources/test_azure.py')
-rw-r--r--tests/unittests/sources/test_azure.py3174
1 files changed, 1833 insertions, 1341 deletions
diff --git a/tests/unittests/sources/test_azure.py b/tests/unittests/sources/test_azure.py
index ad8be04b..8b0762b7 100644
--- a/tests/unittests/sources/test_azure.py
+++ b/tests/unittests/sources/test_azure.py
@@ -1,33 +1,47 @@
# This file is part of cloud-init. See LICENSE file for license information.
-from cloudinit import distros
-from cloudinit import helpers
-from cloudinit import url_helper
-from cloudinit.sources import (
- UNSET, DataSourceAzure as dsaz, InvalidMetaDataException)
-from cloudinit.util import (b64e, decode_binary, load_file, write_file,
- MountFailedError, json_dumps, load_json)
-from cloudinit.version import version_string as vs
-from tests.unittests.helpers import (
- HttprettyTestCase, CiTestCase, populate_dir, mock, wrap_and_call,
- ExitStack, resourceLocation)
-from cloudinit.sources.helpers import netlink
-
import copy
import crypt
-import httpretty
import json
import os
-import requests
import stat
import xml.etree.ElementTree as ET
-import yaml
+import httpretty
+import requests
+import yaml
-def construct_valid_ovf_env(data=None, pubkeys=None,
- userdata=None, platform_settings=None):
+from cloudinit import distros, helpers, url_helper
+from cloudinit.sources import UNSET
+from cloudinit.sources import DataSourceAzure as dsaz
+from cloudinit.sources import InvalidMetaDataException
+from cloudinit.sources.helpers import netlink
+from cloudinit.util import (
+ MountFailedError,
+ b64e,
+ decode_binary,
+ json_dumps,
+ load_file,
+ load_json,
+ write_file,
+)
+from cloudinit.version import version_string as vs
+from tests.unittests.helpers import (
+ CiTestCase,
+ ExitStack,
+ HttprettyTestCase,
+ mock,
+ populate_dir,
+ resourceLocation,
+ wrap_and_call,
+)
+
+
+def construct_valid_ovf_env(
+ data=None, pubkeys=None, userdata=None, platform_settings=None
+):
if data is None:
- data = {'HostName': 'FOOHOST'}
+ data = {"HostName": "FOOHOST"}
if pubkeys is None:
pubkeys = {}
@@ -45,9 +59,14 @@ def construct_valid_ovf_env(data=None, pubkeys=None,
"""
for key, dval in data.items():
if isinstance(dval, dict):
- val = dict(dval).get('text')
- attrs = ' ' + ' '.join(["%s='%s'" % (k, v) for k, v
- in dict(dval).items() if k != 'text'])
+ val = dict(dval).get("text")
+ attrs = " " + " ".join(
+ [
+ "%s='%s'" % (k, v)
+ for k, v in dict(dval).items()
+ if k != "text"
+ ]
+ )
else:
val = dval
attrs = ""
@@ -61,8 +80,10 @@ def construct_valid_ovf_env(data=None, pubkeys=None,
for fp, path, value in pubkeys:
content += " <PublicKey>"
if fp and path:
- content += ("<Fingerprint>%s</Fingerprint><Path>%s</Path>" %
- (fp, path))
+ content += "<Fingerprint>%s</Fingerprint><Path>%s</Path>" % (
+ fp,
+ path,
+ )
if value:
content += "<Value>%s</Value>" % value
content += "</PublicKey>\n"
@@ -106,300 +127,331 @@ NETWORK_METADATA = {
"vmScaleSetName": "",
"vmSize": "Standard_DS1_v2",
"zone": "",
- "publicKeys": [
- {
- "keyData": "ssh-rsa key1",
- "path": "path1"
- }
- ]
+ "publicKeys": [{"keyData": "ssh-rsa key1", "path": "path1"}],
},
"network": {
"interface": [
{
"macAddress": "000D3A047598",
- "ipv6": {
- "ipAddress": []
- },
+ "ipv6": {"ipAddress": []},
"ipv4": {
- "subnet": [
- {
- "prefix": "24",
- "address": "10.0.0.0"
- }
- ],
+ "subnet": [{"prefix": "24", "address": "10.0.0.0"}],
"ipAddress": [
{
"privateIpAddress": "10.0.0.4",
- "publicIpAddress": "104.46.124.81"
+ "publicIpAddress": "104.46.124.81",
}
- ]
- }
+ ],
+ },
}
]
- }
+ },
}
SECONDARY_INTERFACE = {
"macAddress": "220D3A047598",
- "ipv6": {
- "ipAddress": []
- },
+ "ipv6": {"ipAddress": []},
"ipv4": {
- "subnet": [
- {
- "prefix": "24",
- "address": "10.0.1.0"
- }
- ],
+ "subnet": [{"prefix": "24", "address": "10.0.1.0"}],
"ipAddress": [
{
"privateIpAddress": "10.0.1.5",
}
- ]
- }
+ ],
+ },
}
SECONDARY_INTERFACE_NO_IP = {
"macAddress": "220D3A047598",
- "ipv6": {
- "ipAddress": []
- },
+ "ipv6": {"ipAddress": []},
"ipv4": {
- "subnet": [
- {
- "prefix": "24",
- "address": "10.0.1.0"
- }
- ],
- "ipAddress": []
- }
+ "subnet": [{"prefix": "24", "address": "10.0.1.0"}],
+ "ipAddress": [],
+ },
}
IMDS_NETWORK_METADATA = {
"interface": [
{
"macAddress": "000D3A047598",
- "ipv6": {
- "ipAddress": []
- },
+ "ipv6": {"ipAddress": []},
"ipv4": {
- "subnet": [
- {
- "prefix": "24",
- "address": "10.0.0.0"
- }
- ],
+ "subnet": [{"prefix": "24", "address": "10.0.0.0"}],
"ipAddress": [
{
"privateIpAddress": "10.0.0.4",
- "publicIpAddress": "104.46.124.81"
+ "publicIpAddress": "104.46.124.81",
}
- ]
- }
+ ],
+ },
}
]
}
-MOCKPATH = 'cloudinit.sources.DataSourceAzure.'
-EXAMPLE_UUID = 'd0df4c54-4ecb-4a4b-9954-5bdf3ed5c3b8'
+MOCKPATH = "cloudinit.sources.DataSourceAzure."
+EXAMPLE_UUID = "d0df4c54-4ecb-4a4b-9954-5bdf3ed5c3b8"
class TestParseNetworkConfig(CiTestCase):
maxDiff = None
fallback_config = {
- 'version': 1,
- 'config': [{
- 'type': 'physical', 'name': 'eth0',
- 'mac_address': '00:11:22:33:44:55',
- 'params': {'driver': 'hv_netsvc'},
- 'subnets': [{'type': 'dhcp'}],
- }]
+ "version": 1,
+ "config": [
+ {
+ "type": "physical",
+ "name": "eth0",
+ "mac_address": "00:11:22:33:44:55",
+ "params": {"driver": "hv_netsvc"},
+ "subnets": [{"type": "dhcp"}],
+ }
+ ],
}
- @mock.patch('cloudinit.sources.DataSourceAzure.device_driver',
- return_value=None)
+ @mock.patch(
+ "cloudinit.sources.DataSourceAzure.device_driver", return_value=None
+ )
def test_single_ipv4_nic_configuration(self, m_driver):
"""parse_network_config emits dhcp on single nic with ipv4"""
- expected = {'ethernets': {
- 'eth0': {'dhcp4': True,
- 'dhcp4-overrides': {'route-metric': 100},
- 'dhcp6': False,
- 'match': {'macaddress': '00:0d:3a:04:75:98'},
- 'set-name': 'eth0'}}, 'version': 2}
+ expected = {
+ "ethernets": {
+ "eth0": {
+ "dhcp4": True,
+ "dhcp4-overrides": {"route-metric": 100},
+ "dhcp6": False,
+ "match": {"macaddress": "00:0d:3a:04:75:98"},
+ "set-name": "eth0",
+ }
+ },
+ "version": 2,
+ }
self.assertEqual(expected, dsaz.parse_network_config(NETWORK_METADATA))
- @mock.patch('cloudinit.sources.DataSourceAzure.device_driver',
- return_value=None)
+ @mock.patch(
+ "cloudinit.sources.DataSourceAzure.device_driver", return_value=None
+ )
def test_increases_route_metric_for_non_primary_nics(self, m_driver):
"""parse_network_config increases route-metric for each nic"""
- expected = {'ethernets': {
- 'eth0': {'dhcp4': True,
- 'dhcp4-overrides': {'route-metric': 100},
- 'dhcp6': False,
- 'match': {'macaddress': '00:0d:3a:04:75:98'},
- 'set-name': 'eth0'},
- 'eth1': {'set-name': 'eth1',
- 'match': {'macaddress': '22:0d:3a:04:75:98'},
- 'dhcp6': False,
- 'dhcp4': True,
- 'dhcp4-overrides': {'route-metric': 200}},
- 'eth2': {'set-name': 'eth2',
- 'match': {'macaddress': '33:0d:3a:04:75:98'},
- 'dhcp6': False,
- 'dhcp4': True,
- 'dhcp4-overrides': {'route-metric': 300}}}, 'version': 2}
+ expected = {
+ "ethernets": {
+ "eth0": {
+ "dhcp4": True,
+ "dhcp4-overrides": {"route-metric": 100},
+ "dhcp6": False,
+ "match": {"macaddress": "00:0d:3a:04:75:98"},
+ "set-name": "eth0",
+ },
+ "eth1": {
+ "set-name": "eth1",
+ "match": {"macaddress": "22:0d:3a:04:75:98"},
+ "dhcp6": False,
+ "dhcp4": True,
+ "dhcp4-overrides": {"route-metric": 200},
+ },
+ "eth2": {
+ "set-name": "eth2",
+ "match": {"macaddress": "33:0d:3a:04:75:98"},
+ "dhcp6": False,
+ "dhcp4": True,
+ "dhcp4-overrides": {"route-metric": 300},
+ },
+ },
+ "version": 2,
+ }
imds_data = copy.deepcopy(NETWORK_METADATA)
- imds_data['network']['interface'].append(SECONDARY_INTERFACE)
+ imds_data["network"]["interface"].append(SECONDARY_INTERFACE)
third_intf = copy.deepcopy(SECONDARY_INTERFACE)
- third_intf['macAddress'] = third_intf['macAddress'].replace('22', '33')
- third_intf['ipv4']['subnet'][0]['address'] = '10.0.2.0'
- third_intf['ipv4']['ipAddress'][0]['privateIpAddress'] = '10.0.2.6'
- imds_data['network']['interface'].append(third_intf)
+ third_intf["macAddress"] = third_intf["macAddress"].replace("22", "33")
+ third_intf["ipv4"]["subnet"][0]["address"] = "10.0.2.0"
+ third_intf["ipv4"]["ipAddress"][0]["privateIpAddress"] = "10.0.2.6"
+ imds_data["network"]["interface"].append(third_intf)
self.assertEqual(expected, dsaz.parse_network_config(imds_data))
- @mock.patch('cloudinit.sources.DataSourceAzure.device_driver',
- return_value=None)
+ @mock.patch(
+ "cloudinit.sources.DataSourceAzure.device_driver", return_value=None
+ )
def test_ipv4_and_ipv6_route_metrics_match_for_nics(self, m_driver):
"""parse_network_config emits matching ipv4 and ipv6 route-metrics."""
- expected = {'ethernets': {
- 'eth0': {'addresses': ['10.0.0.5/24', '2001:dead:beef::2/128'],
- 'dhcp4': True,
- 'dhcp4-overrides': {'route-metric': 100},
- 'dhcp6': True,
- 'dhcp6-overrides': {'route-metric': 100},
- 'match': {'macaddress': '00:0d:3a:04:75:98'},
- 'set-name': 'eth0'},
- 'eth1': {'set-name': 'eth1',
- 'match': {'macaddress': '22:0d:3a:04:75:98'},
- 'dhcp4': True,
- 'dhcp6': False,
- 'dhcp4-overrides': {'route-metric': 200}},
- 'eth2': {'set-name': 'eth2',
- 'match': {'macaddress': '33:0d:3a:04:75:98'},
- 'dhcp4': True,
- 'dhcp4-overrides': {'route-metric': 300},
- 'dhcp6': True,
- 'dhcp6-overrides': {'route-metric': 300}}}, 'version': 2}
+ expected = {
+ "ethernets": {
+ "eth0": {
+ "addresses": ["10.0.0.5/24", "2001:dead:beef::2/128"],
+ "dhcp4": True,
+ "dhcp4-overrides": {"route-metric": 100},
+ "dhcp6": True,
+ "dhcp6-overrides": {"route-metric": 100},
+ "match": {"macaddress": "00:0d:3a:04:75:98"},
+ "set-name": "eth0",
+ },
+ "eth1": {
+ "set-name": "eth1",
+ "match": {"macaddress": "22:0d:3a:04:75:98"},
+ "dhcp4": True,
+ "dhcp6": False,
+ "dhcp4-overrides": {"route-metric": 200},
+ },
+ "eth2": {
+ "set-name": "eth2",
+ "match": {"macaddress": "33:0d:3a:04:75:98"},
+ "dhcp4": True,
+ "dhcp4-overrides": {"route-metric": 300},
+ "dhcp6": True,
+ "dhcp6-overrides": {"route-metric": 300},
+ },
+ },
+ "version": 2,
+ }
imds_data = copy.deepcopy(NETWORK_METADATA)
- nic1 = imds_data['network']['interface'][0]
- nic1['ipv4']['ipAddress'].append({'privateIpAddress': '10.0.0.5'})
+ nic1 = imds_data["network"]["interface"][0]
+ nic1["ipv4"]["ipAddress"].append({"privateIpAddress": "10.0.0.5"})
- nic1['ipv6'] = {
+ nic1["ipv6"] = {
"subnet": [{"address": "2001:dead:beef::16"}],
- "ipAddress": [{"privateIpAddress": "2001:dead:beef::1"},
- {"privateIpAddress": "2001:dead:beef::2"}]
+ "ipAddress": [
+ {"privateIpAddress": "2001:dead:beef::1"},
+ {"privateIpAddress": "2001:dead:beef::2"},
+ ],
}
- imds_data['network']['interface'].append(SECONDARY_INTERFACE)
+ imds_data["network"]["interface"].append(SECONDARY_INTERFACE)
third_intf = copy.deepcopy(SECONDARY_INTERFACE)
- third_intf['macAddress'] = third_intf['macAddress'].replace('22', '33')
- third_intf['ipv4']['subnet'][0]['address'] = '10.0.2.0'
- third_intf['ipv4']['ipAddress'][0]['privateIpAddress'] = '10.0.2.6'
- third_intf['ipv6'] = {
+ third_intf["macAddress"] = third_intf["macAddress"].replace("22", "33")
+ third_intf["ipv4"]["subnet"][0]["address"] = "10.0.2.0"
+ third_intf["ipv4"]["ipAddress"][0]["privateIpAddress"] = "10.0.2.6"
+ third_intf["ipv6"] = {
"subnet": [{"prefix": "64", "address": "2001:dead:beef::2"}],
- "ipAddress": [{"privateIpAddress": "2001:dead:beef::1"}]
+ "ipAddress": [{"privateIpAddress": "2001:dead:beef::1"}],
}
- imds_data['network']['interface'].append(third_intf)
+ imds_data["network"]["interface"].append(third_intf)
self.assertEqual(expected, dsaz.parse_network_config(imds_data))
- @mock.patch('cloudinit.sources.DataSourceAzure.device_driver',
- return_value=None)
+ @mock.patch(
+ "cloudinit.sources.DataSourceAzure.device_driver", return_value=None
+ )
def test_ipv4_secondary_ips_will_be_static_addrs(self, m_driver):
"""parse_network_config emits primary ipv4 as dhcp others are static"""
- expected = {'ethernets': {
- 'eth0': {'addresses': ['10.0.0.5/24'],
- 'dhcp4': True,
- 'dhcp4-overrides': {'route-metric': 100},
- 'dhcp6': True,
- 'dhcp6-overrides': {'route-metric': 100},
- 'match': {'macaddress': '00:0d:3a:04:75:98'},
- 'set-name': 'eth0'}}, 'version': 2}
+ expected = {
+ "ethernets": {
+ "eth0": {
+ "addresses": ["10.0.0.5/24"],
+ "dhcp4": True,
+ "dhcp4-overrides": {"route-metric": 100},
+ "dhcp6": True,
+ "dhcp6-overrides": {"route-metric": 100},
+ "match": {"macaddress": "00:0d:3a:04:75:98"},
+ "set-name": "eth0",
+ }
+ },
+ "version": 2,
+ }
imds_data = copy.deepcopy(NETWORK_METADATA)
- nic1 = imds_data['network']['interface'][0]
- nic1['ipv4']['ipAddress'].append({'privateIpAddress': '10.0.0.5'})
+ nic1 = imds_data["network"]["interface"][0]
+ nic1["ipv4"]["ipAddress"].append({"privateIpAddress": "10.0.0.5"})
- nic1['ipv6'] = {
+ nic1["ipv6"] = {
"subnet": [{"prefix": "10", "address": "2001:dead:beef::16"}],
- "ipAddress": [{"privateIpAddress": "2001:dead:beef::1"}]
+ "ipAddress": [{"privateIpAddress": "2001:dead:beef::1"}],
}
self.assertEqual(expected, dsaz.parse_network_config(imds_data))
- @mock.patch('cloudinit.sources.DataSourceAzure.device_driver',
- return_value=None)
+ @mock.patch(
+ "cloudinit.sources.DataSourceAzure.device_driver", return_value=None
+ )
def test_ipv6_secondary_ips_will_be_static_cidrs(self, m_driver):
"""parse_network_config emits primary ipv6 as dhcp others are static"""
- expected = {'ethernets': {
- 'eth0': {'addresses': ['10.0.0.5/24', '2001:dead:beef::2/10'],
- 'dhcp4': True,
- 'dhcp4-overrides': {'route-metric': 100},
- 'dhcp6': True,
- 'dhcp6-overrides': {'route-metric': 100},
- 'match': {'macaddress': '00:0d:3a:04:75:98'},
- 'set-name': 'eth0'}}, 'version': 2}
+ expected = {
+ "ethernets": {
+ "eth0": {
+ "addresses": ["10.0.0.5/24", "2001:dead:beef::2/10"],
+ "dhcp4": True,
+ "dhcp4-overrides": {"route-metric": 100},
+ "dhcp6": True,
+ "dhcp6-overrides": {"route-metric": 100},
+ "match": {"macaddress": "00:0d:3a:04:75:98"},
+ "set-name": "eth0",
+ }
+ },
+ "version": 2,
+ }
imds_data = copy.deepcopy(NETWORK_METADATA)
- nic1 = imds_data['network']['interface'][0]
- nic1['ipv4']['ipAddress'].append({'privateIpAddress': '10.0.0.5'})
+ nic1 = imds_data["network"]["interface"][0]
+ nic1["ipv4"]["ipAddress"].append({"privateIpAddress": "10.0.0.5"})
# Secondary ipv6 addresses currently ignored/unconfigured
- nic1['ipv6'] = {
+ nic1["ipv6"] = {
"subnet": [{"prefix": "10", "address": "2001:dead:beef::16"}],
- "ipAddress": [{"privateIpAddress": "2001:dead:beef::1"},
- {"privateIpAddress": "2001:dead:beef::2"}]
+ "ipAddress": [
+ {"privateIpAddress": "2001:dead:beef::1"},
+ {"privateIpAddress": "2001:dead:beef::2"},
+ ],
}
self.assertEqual(expected, dsaz.parse_network_config(imds_data))
- @mock.patch('cloudinit.sources.DataSourceAzure.device_driver',
- return_value='hv_netvsc')
+ @mock.patch(
+ "cloudinit.sources.DataSourceAzure.device_driver",
+ return_value="hv_netvsc",
+ )
def test_match_driver_for_netvsc(self, m_driver):
"""parse_network_config emits driver when using netvsc."""
- expected = {'ethernets': {
- 'eth0': {
- 'dhcp4': True,
- 'dhcp4-overrides': {'route-metric': 100},
- 'dhcp6': False,
- 'match': {
- 'macaddress': '00:0d:3a:04:75:98',
- 'driver': 'hv_netvsc',
- },
- 'set-name': 'eth0'
- }}, 'version': 2}
+ expected = {
+ "ethernets": {
+ "eth0": {
+ "dhcp4": True,
+ "dhcp4-overrides": {"route-metric": 100},
+ "dhcp6": False,
+ "match": {
+ "macaddress": "00:0d:3a:04:75:98",
+ "driver": "hv_netvsc",
+ },
+ "set-name": "eth0",
+ }
+ },
+ "version": 2,
+ }
self.assertEqual(expected, dsaz.parse_network_config(NETWORK_METADATA))
- @mock.patch('cloudinit.sources.DataSourceAzure.device_driver',
- return_value=None)
- @mock.patch('cloudinit.net.generate_fallback_config')
+ @mock.patch(
+ "cloudinit.sources.DataSourceAzure.device_driver", return_value=None
+ )
+ @mock.patch("cloudinit.net.generate_fallback_config")
def test_parse_network_config_uses_fallback_cfg_when_no_network_metadata(
- self, m_fallback_config, m_driver):
+ self, m_fallback_config, m_driver
+ ):
"""parse_network_config generates fallback network config when the
IMDS instance metadata is corrupted/invalid, such as when
network metadata is not present.
"""
imds_metadata_missing_network_metadata = copy.deepcopy(
- NETWORK_METADATA)
- del imds_metadata_missing_network_metadata['network']
+ NETWORK_METADATA
+ )
+ del imds_metadata_missing_network_metadata["network"]
m_fallback_config.return_value = self.fallback_config
self.assertEqual(
self.fallback_config,
- dsaz.parse_network_config(
- imds_metadata_missing_network_metadata))
+ dsaz.parse_network_config(imds_metadata_missing_network_metadata),
+ )
- @mock.patch('cloudinit.sources.DataSourceAzure.device_driver',
- return_value=None)
- @mock.patch('cloudinit.net.generate_fallback_config')
+ @mock.patch(
+ "cloudinit.sources.DataSourceAzure.device_driver", return_value=None
+ )
+ @mock.patch("cloudinit.net.generate_fallback_config")
def test_parse_network_config_uses_fallback_cfg_when_no_interface_metadata(
- self, m_fallback_config, m_driver):
+ self, m_fallback_config, m_driver
+ ):
"""parse_network_config generates fallback network config when the
IMDS instance metadata is corrupted/invalid, such as when
network interface metadata is not present.
"""
imds_metadata_missing_interface_metadata = copy.deepcopy(
- NETWORK_METADATA)
- del imds_metadata_missing_interface_metadata['network']['interface']
+ NETWORK_METADATA
+ )
+ del imds_metadata_missing_interface_metadata["network"]["interface"]
m_fallback_config.return_value = self.fallback_config
self.assertEqual(
self.fallback_config,
dsaz.parse_network_config(
- imds_metadata_missing_interface_metadata))
+ imds_metadata_missing_interface_metadata
+ ),
+ )
class TestGetMetadataFromIMDS(HttprettyTestCase):
@@ -412,175 +464,218 @@ class TestGetMetadataFromIMDS(HttprettyTestCase):
dsaz.IMDS_URL
)
- @mock.patch(MOCKPATH + 'readurl')
- @mock.patch(MOCKPATH + 'EphemeralDHCPv4', autospec=True)
- @mock.patch(MOCKPATH + 'net.is_up', autospec=True)
+ @mock.patch(MOCKPATH + "readurl")
+ @mock.patch(MOCKPATH + "EphemeralDHCPv4", autospec=True)
+ @mock.patch(MOCKPATH + "net.is_up", autospec=True)
def test_get_metadata_does_not_dhcp_if_network_is_up(
- self, m_net_is_up, m_dhcp, m_readurl):
+ self, m_net_is_up, m_dhcp, m_readurl
+ ):
"""Do not perform DHCP setup when nic is already up."""
m_net_is_up.return_value = True
m_readurl.return_value = url_helper.StringResponse(
- json.dumps(NETWORK_METADATA).encode('utf-8'))
+ json.dumps(NETWORK_METADATA).encode("utf-8")
+ )
self.assertEqual(
- NETWORK_METADATA,
- dsaz.get_metadata_from_imds('eth9', retries=3))
+ NETWORK_METADATA, dsaz.get_metadata_from_imds("eth9", retries=3)
+ )
- m_net_is_up.assert_called_with('eth9')
+ m_net_is_up.assert_called_with("eth9")
m_dhcp.assert_not_called()
self.assertIn(
"Crawl of Azure Instance Metadata Service (IMDS) took", # log_time
- self.logs.getvalue())
+ self.logs.getvalue(),
+ )
- @mock.patch(MOCKPATH + 'readurl', autospec=True)
- @mock.patch(MOCKPATH + 'EphemeralDHCPv4')
- @mock.patch(MOCKPATH + 'net.is_up')
+ @mock.patch(MOCKPATH + "readurl", autospec=True)
+ @mock.patch(MOCKPATH + "EphemeralDHCPv4")
+ @mock.patch(MOCKPATH + "net.is_up")
def test_get_metadata_uses_instance_url(
- self, m_net_is_up, m_dhcp, m_readurl):
+ self, m_net_is_up, m_dhcp, m_readurl
+ ):
"""Make sure readurl is called with the correct url when accessing
metadata"""
m_net_is_up.return_value = True
m_readurl.return_value = url_helper.StringResponse(
- json.dumps(IMDS_NETWORK_METADATA).encode('utf-8'))
+ json.dumps(IMDS_NETWORK_METADATA).encode("utf-8")
+ )
dsaz.get_metadata_from_imds(
- 'eth0', retries=3, md_type=dsaz.metadata_type.all)
+ "eth0", retries=3, md_type=dsaz.metadata_type.all
+ )
m_readurl.assert_called_with(
- "http://169.254.169.254/metadata/instance?api-version="
- "2019-06-01", exception_cb=mock.ANY,
- headers=mock.ANY, retries=mock.ANY,
- timeout=mock.ANY, infinite=False)
+ "http://169.254.169.254/metadata/instance?api-version=2019-06-01",
+ exception_cb=mock.ANY,
+ headers=mock.ANY,
+ retries=mock.ANY,
+ timeout=mock.ANY,
+ infinite=False,
+ )
- @mock.patch(MOCKPATH + 'readurl', autospec=True)
- @mock.patch(MOCKPATH + 'EphemeralDHCPv4')
- @mock.patch(MOCKPATH + 'net.is_up')
+ @mock.patch(MOCKPATH + "readurl", autospec=True)
+ @mock.patch(MOCKPATH + "EphemeralDHCPv4")
+ @mock.patch(MOCKPATH + "net.is_up")
def test_get_network_metadata_uses_network_url(
- self, m_net_is_up, m_dhcp, m_readurl):
+ self, m_net_is_up, m_dhcp, m_readurl
+ ):
"""Make sure readurl is called with the correct url when accessing
network metadata"""
m_net_is_up.return_value = True
m_readurl.return_value = url_helper.StringResponse(
- json.dumps(IMDS_NETWORK_METADATA).encode('utf-8'))
+ json.dumps(IMDS_NETWORK_METADATA).encode("utf-8")
+ )
dsaz.get_metadata_from_imds(
- 'eth0', retries=3, md_type=dsaz.metadata_type.network)
+ "eth0", retries=3, md_type=dsaz.metadata_type.network
+ )
m_readurl.assert_called_with(
"http://169.254.169.254/metadata/instance/network?api-version="
- "2019-06-01", exception_cb=mock.ANY,
- headers=mock.ANY, retries=mock.ANY,
- timeout=mock.ANY, infinite=False)
+ "2019-06-01",
+ exception_cb=mock.ANY,
+ headers=mock.ANY,
+ retries=mock.ANY,
+ timeout=mock.ANY,
+ infinite=False,
+ )
- @mock.patch(MOCKPATH + 'readurl', autospec=True)
- @mock.patch(MOCKPATH + 'EphemeralDHCPv4')
- @mock.patch(MOCKPATH + 'net.is_up')
+ @mock.patch(MOCKPATH + "readurl", autospec=True)
+ @mock.patch(MOCKPATH + "EphemeralDHCPv4")
+ @mock.patch(MOCKPATH + "net.is_up")
def test_get_default_metadata_uses_instance_url(
- self, m_net_is_up, m_dhcp, m_readurl):
+ self, m_net_is_up, m_dhcp, m_readurl
+ ):
"""Make sure readurl is called with the correct url when accessing
metadata"""
m_net_is_up.return_value = True
m_readurl.return_value = url_helper.StringResponse(
- json.dumps(IMDS_NETWORK_METADATA).encode('utf-8'))
+ json.dumps(IMDS_NETWORK_METADATA).encode("utf-8")
+ )
- dsaz.get_metadata_from_imds(
- 'eth0', retries=3)
+ dsaz.get_metadata_from_imds("eth0", retries=3)
m_readurl.assert_called_with(
- "http://169.254.169.254/metadata/instance?api-version="
- "2019-06-01", exception_cb=mock.ANY,
- headers=mock.ANY, retries=mock.ANY,
- timeout=mock.ANY, infinite=False)
+ "http://169.254.169.254/metadata/instance?api-version=2019-06-01",
+ exception_cb=mock.ANY,
+ headers=mock.ANY,
+ retries=mock.ANY,
+ timeout=mock.ANY,
+ infinite=False,
+ )
- @mock.patch(MOCKPATH + 'readurl', autospec=True)
- @mock.patch(MOCKPATH + 'EphemeralDHCPv4')
- @mock.patch(MOCKPATH + 'net.is_up')
+ @mock.patch(MOCKPATH + "readurl", autospec=True)
+ @mock.patch(MOCKPATH + "EphemeralDHCPv4")
+ @mock.patch(MOCKPATH + "net.is_up")
def test_get_metadata_uses_extended_url(
- self, m_net_is_up, m_dhcp, m_readurl):
+ self, m_net_is_up, m_dhcp, m_readurl
+ ):
"""Make sure readurl is called with the correct url when accessing
metadata"""
m_net_is_up.return_value = True
m_readurl.return_value = url_helper.StringResponse(
- json.dumps(IMDS_NETWORK_METADATA).encode('utf-8'))
+ json.dumps(IMDS_NETWORK_METADATA).encode("utf-8")
+ )
dsaz.get_metadata_from_imds(
- 'eth0', retries=3, md_type=dsaz.metadata_type.all,
- api_version="2021-08-01")
+ "eth0",
+ retries=3,
+ md_type=dsaz.metadata_type.all,
+ api_version="2021-08-01",
+ )
m_readurl.assert_called_with(
"http://169.254.169.254/metadata/instance?api-version="
- "2021-08-01&extended=true", exception_cb=mock.ANY,
- headers=mock.ANY, retries=mock.ANY,
- timeout=mock.ANY, infinite=False)
+ "2021-08-01&extended=true",
+ exception_cb=mock.ANY,
+ headers=mock.ANY,
+ retries=mock.ANY,
+ timeout=mock.ANY,
+ infinite=False,
+ )
- @mock.patch(MOCKPATH + 'readurl', autospec=True)
- @mock.patch(MOCKPATH + 'EphemeralDHCPv4WithReporting', autospec=True)
- @mock.patch(MOCKPATH + 'net.is_up', autospec=True)
+ @mock.patch(MOCKPATH + "readurl", autospec=True)
+ @mock.patch(MOCKPATH + "EphemeralDHCPv4WithReporting", autospec=True)
+ @mock.patch(MOCKPATH + "net.is_up", autospec=True)
def test_get_metadata_performs_dhcp_when_network_is_down(
- self, m_net_is_up, m_dhcp, m_readurl):
+ self, m_net_is_up, m_dhcp, m_readurl
+ ):
"""Perform DHCP setup when nic is not up."""
m_net_is_up.return_value = False
m_readurl.return_value = url_helper.StringResponse(
- json.dumps(NETWORK_METADATA).encode('utf-8'))
+ json.dumps(NETWORK_METADATA).encode("utf-8")
+ )
self.assertEqual(
- NETWORK_METADATA,
- dsaz.get_metadata_from_imds('eth9', retries=2))
+ NETWORK_METADATA, dsaz.get_metadata_from_imds("eth9", retries=2)
+ )
- m_net_is_up.assert_called_with('eth9')
- m_dhcp.assert_called_with(mock.ANY, 'eth9')
+ m_net_is_up.assert_called_with("eth9")
+ m_dhcp.assert_called_with(mock.ANY, "eth9")
self.assertIn(
"Crawl of Azure Instance Metadata Service (IMDS) took", # log_time
- self.logs.getvalue())
+ self.logs.getvalue(),
+ )
m_readurl.assert_called_with(
- self.network_md_url, exception_cb=mock.ANY,
- headers={'Metadata': 'true'}, retries=2,
- timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS, infinite=False)
+ self.network_md_url,
+ exception_cb=mock.ANY,
+ headers={"Metadata": "true"},
+ retries=2,
+ timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS,
+ infinite=False,
+ )
- @mock.patch('cloudinit.url_helper.time.sleep')
- @mock.patch(MOCKPATH + 'net.is_up', autospec=True)
+ @mock.patch("cloudinit.url_helper.time.sleep")
+ @mock.patch(MOCKPATH + "net.is_up", autospec=True)
def test_get_metadata_from_imds_empty_when_no_imds_present(
- self, m_net_is_up, m_sleep):
+ self, m_net_is_up, m_sleep
+ ):
"""Return empty dict when IMDS network metadata is absent."""
httpretty.register_uri(
httpretty.GET,
- dsaz.IMDS_URL + '/instance?api-version=2017-12-01',
- body={}, status=404)
+ dsaz.IMDS_URL + "/instance?api-version=2017-12-01",
+ body={},
+ status=404,
+ )
m_net_is_up.return_value = True # skips dhcp
- self.assertEqual({}, dsaz.get_metadata_from_imds('eth9', retries=2))
+ self.assertEqual({}, dsaz.get_metadata_from_imds("eth9", retries=2))
- m_net_is_up.assert_called_with('eth9')
+ m_net_is_up.assert_called_with("eth9")
self.assertEqual([mock.call(1), mock.call(1)], m_sleep.call_args_list)
self.assertIn(
"Crawl of Azure Instance Metadata Service (IMDS) took", # log_time
- self.logs.getvalue())
+ self.logs.getvalue(),
+ )
- @mock.patch('requests.Session.request')
- @mock.patch('cloudinit.url_helper.time.sleep')
- @mock.patch(MOCKPATH + 'net.is_up', autospec=True)
+ @mock.patch("requests.Session.request")
+ @mock.patch("cloudinit.url_helper.time.sleep")
+ @mock.patch(MOCKPATH + "net.is_up", autospec=True)
def test_get_metadata_from_imds_retries_on_timeout(
- self, m_net_is_up, m_sleep, m_request):
+ self, m_net_is_up, m_sleep, m_request
+ ):
"""Retry IMDS network metadata on timeout errors."""
self.attempt = 0
- m_request.side_effect = requests.Timeout('Fake Connection Timeout')
+ m_request.side_effect = requests.Timeout("Fake Connection Timeout")
def retry_callback(request, uri, headers):
self.attempt += 1
- raise requests.Timeout('Fake connection timeout')
+ raise requests.Timeout("Fake connection timeout")
httpretty.register_uri(
httpretty.GET,
- dsaz.IMDS_URL + 'instance?api-version=2017-12-01',
- body=retry_callback)
+ dsaz.IMDS_URL + "instance?api-version=2017-12-01",
+ body=retry_callback,
+ )
m_net_is_up.return_value = True # skips dhcp
- self.assertEqual({}, dsaz.get_metadata_from_imds('eth9', retries=3))
+ self.assertEqual({}, dsaz.get_metadata_from_imds("eth9", retries=3))
- m_net_is_up.assert_called_with('eth9')
- self.assertEqual([mock.call(1)]*3, m_sleep.call_args_list)
+ m_net_is_up.assert_called_with("eth9")
+ self.assertEqual([mock.call(1)] * 3, m_sleep.call_args_list)
self.assertIn(
"Crawl of Azure Instance Metadata Service (IMDS) took", # log_time
- self.logs.getvalue())
+ self.logs.getvalue(),
+ )
class TestAzureDataSource(CiTestCase):
@@ -593,25 +688,35 @@ class TestAzureDataSource(CiTestCase):
# patch cloud_dir, so our 'seed_dir' is guaranteed empty
self.paths = helpers.Paths(
- {'cloud_dir': self.tmp, 'run_dir': self.tmp})
- self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent')
+ {"cloud_dir": self.tmp, "run_dir": self.tmp}
+ )
+ self.waagent_d = os.path.join(self.tmp, "var", "lib", "waagent")
self.patches = ExitStack()
self.addCleanup(self.patches.close)
- self.patches.enter_context(mock.patch.object(
- dsaz, '_get_random_seed', return_value='wild'))
+ self.patches.enter_context(
+ mock.patch.object(dsaz, "_get_random_seed", return_value="wild")
+ )
self.m_get_metadata_from_imds = self.patches.enter_context(
mock.patch.object(
- dsaz, 'get_metadata_from_imds',
- mock.MagicMock(return_value=NETWORK_METADATA)))
+ dsaz,
+ "get_metadata_from_imds",
+ mock.MagicMock(return_value=NETWORK_METADATA),
+ )
+ )
self.m_fallback_nic = self.patches.enter_context(
- mock.patch('cloudinit.sources.net.find_fallback_nic',
- return_value='eth9'))
+ mock.patch(
+ "cloudinit.sources.net.find_fallback_nic", return_value="eth9"
+ )
+ )
self.m_remove_ubuntu_network_scripts = self.patches.enter_context(
mock.patch.object(
- dsaz, 'maybe_remove_ubuntu_network_config_scripts',
- mock.MagicMock()))
+ dsaz,
+ "maybe_remove_ubuntu_network_config_scripts",
+ mock.MagicMock(),
+ )
+ )
super(TestAzureDataSource, self).setUp()
def apply_patches(self, patches):
@@ -619,15 +724,21 @@ class TestAzureDataSource(CiTestCase):
self.patches.enter_context(mock.patch.object(module, name, new))
def _get_mockds(self):
- sysctl_out = "dev.storvsc.3.%pnpinfo: "\
- "classid=ba6163d9-04a1-4d29-b605-72e2ffb1dc7f "\
- "deviceid=f8b3781b-1e82-4818-a1c3-63d806ec15bb\n"
- sysctl_out += "dev.storvsc.2.%pnpinfo: "\
- "classid=ba6163d9-04a1-4d29-b605-72e2ffb1dc7f "\
- "deviceid=f8b3781a-1e82-4818-a1c3-63d806ec15bb\n"
- sysctl_out += "dev.storvsc.1.%pnpinfo: "\
- "classid=32412632-86cb-44a2-9b5c-50d1417354f5 "\
- "deviceid=00000000-0001-8899-0000-000000000000\n"
+ sysctl_out = (
+ "dev.storvsc.3.%pnpinfo: "
+ "classid=ba6163d9-04a1-4d29-b605-72e2ffb1dc7f "
+ "deviceid=f8b3781b-1e82-4818-a1c3-63d806ec15bb\n"
+ )
+ sysctl_out += (
+ "dev.storvsc.2.%pnpinfo: "
+ "classid=ba6163d9-04a1-4d29-b605-72e2ffb1dc7f "
+ "deviceid=f8b3781a-1e82-4818-a1c3-63d806ec15bb\n"
+ )
+ sysctl_out += (
+ "dev.storvsc.1.%pnpinfo: "
+ "classid=32412632-86cb-44a2-9b5c-50d1417354f5 "
+ "deviceid=00000000-0001-8899-0000-000000000000\n"
+ )
camctl_devbus = """
scbus0 on ata0 bus 0
scbus1 on ata1 bus 0
@@ -642,45 +753,57 @@ scbus-1 on xpt0 bus 0
<Msft Virtual Disk 1.0> at scbus2 target 0 lun 0 (da0,pass1)
<Msft Virtual Disk 1.0> at scbus3 target 1 lun 0 (da1,pass2)
"""
- self.apply_patches([
- (dsaz, 'get_dev_storvsc_sysctl', mock.MagicMock(
- return_value=sysctl_out)),
- (dsaz, 'get_camcontrol_dev_bus', mock.MagicMock(
- return_value=camctl_devbus)),
- (dsaz, 'get_camcontrol_dev', mock.MagicMock(
- return_value=camctl_dev))
- ])
+ self.apply_patches(
+ [
+ (
+ dsaz,
+ "get_dev_storvsc_sysctl",
+ mock.MagicMock(return_value=sysctl_out),
+ ),
+ (
+ dsaz,
+ "get_camcontrol_dev_bus",
+ mock.MagicMock(return_value=camctl_devbus),
+ ),
+ (
+ dsaz,
+ "get_camcontrol_dev",
+ mock.MagicMock(return_value=camctl_dev),
+ ),
+ ]
+ )
return dsaz
- def _get_ds(self, data, distro='ubuntu',
- apply_network=None, instance_id=None):
-
+ def _get_ds(
+ self, data, distro="ubuntu", apply_network=None, instance_id=None
+ ):
def _wait_for_files(flist, _maxwait=None, _naplen=None):
- data['waited'] = flist
+ data["waited"] = flist
return []
def _load_possible_azure_ds(seed_dir, cache_dir):
yield seed_dir
yield dsaz.DEFAULT_PROVISIONING_ISO_DEV
- yield from data.get('dsdevs', [])
+ yield from data.get("dsdevs", [])
if cache_dir:
yield cache_dir
seed_dir = os.path.join(self.paths.seed_dir, "azure")
- if data.get('ovfcontent') is not None:
- populate_dir(seed_dir,
- {'ovf-env.xml': data['ovfcontent']})
+ if data.get("ovfcontent") is not None:
+ populate_dir(seed_dir, {"ovf-env.xml": data["ovfcontent"]})
- dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
+ dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d
self.m_is_platform_viable = mock.MagicMock(autospec=True)
self.m_get_metadata_from_fabric = mock.MagicMock(
- return_value={'public-keys': []})
+ return_value={"public-keys": []}
+ )
self.m_report_failure_to_fabric = mock.MagicMock(autospec=True)
self.m_ephemeral_dhcpv4 = mock.MagicMock()
self.m_ephemeral_dhcpv4_with_reporting = mock.MagicMock()
self.m_list_possible_azure_ds = mock.MagicMock(
- side_effect=_load_possible_azure_ds)
+ side_effect=_load_possible_azure_ds
+ )
if instance_id:
self.instance_id = instance_id
@@ -688,39 +811,59 @@ scbus-1 on xpt0 bus 0
self.instance_id = EXAMPLE_UUID
def _dmi_mocks(key):
- if key == 'system-uuid':
+ if key == "system-uuid":
return self.instance_id
- elif key == 'chassis-asset-tag':
- return '7783-7084-3265-9085-8269-3286-77'
-
- self.apply_patches([
- (dsaz, 'list_possible_azure_ds',
- self.m_list_possible_azure_ds),
- (dsaz, '_is_platform_viable',
- self.m_is_platform_viable),
- (dsaz, 'get_metadata_from_fabric',
- self.m_get_metadata_from_fabric),
- (dsaz, 'report_failure_to_fabric',
- self.m_report_failure_to_fabric),
- (dsaz, 'EphemeralDHCPv4', self.m_ephemeral_dhcpv4),
- (dsaz, 'EphemeralDHCPv4WithReporting',
- self.m_ephemeral_dhcpv4_with_reporting),
- (dsaz, 'get_boot_telemetry', mock.MagicMock()),
- (dsaz, 'get_system_info', mock.MagicMock()),
- (dsaz.subp, 'which', lambda x: True),
- (dsaz.dmi, 'read_dmi_data', mock.MagicMock(
- side_effect=_dmi_mocks)),
- (dsaz.util, 'wait_for_files', mock.MagicMock(
- side_effect=_wait_for_files)),
- ])
+ elif key == "chassis-asset-tag":
+ return "7783-7084-3265-9085-8269-3286-77"
+
+ self.apply_patches(
+ [
+ (
+ dsaz,
+ "list_possible_azure_ds",
+ self.m_list_possible_azure_ds,
+ ),
+ (dsaz, "_is_platform_viable", self.m_is_platform_viable),
+ (
+ dsaz,
+ "get_metadata_from_fabric",
+ self.m_get_metadata_from_fabric,
+ ),
+ (
+ dsaz,
+ "report_failure_to_fabric",
+ self.m_report_failure_to_fabric,
+ ),
+ (dsaz, "EphemeralDHCPv4", self.m_ephemeral_dhcpv4),
+ (
+ dsaz,
+ "EphemeralDHCPv4WithReporting",
+ self.m_ephemeral_dhcpv4_with_reporting,
+ ),
+ (dsaz, "get_boot_telemetry", mock.MagicMock()),
+ (dsaz, "get_system_info", mock.MagicMock()),
+ (dsaz.subp, "which", lambda x: True),
+ (
+ dsaz.dmi,
+ "read_dmi_data",
+ mock.MagicMock(side_effect=_dmi_mocks),
+ ),
+ (
+ dsaz.util,
+ "wait_for_files",
+ mock.MagicMock(side_effect=_wait_for_files),
+ ),
+ ]
+ )
if isinstance(distro, str):
distro_cls = distros.fetch(distro)
- distro = distro_cls(distro, data.get('sys_cfg', {}), self.paths)
+ distro = distro_cls(distro, data.get("sys_cfg", {}), self.paths)
dsrc = dsaz.DataSourceAzure(
- data.get('sys_cfg', {}), distro=distro, paths=self.paths)
+ data.get("sys_cfg", {}), distro=distro, paths=self.paths
+ )
if apply_network is not None:
- dsrc.ds_cfg['apply_network_config'] = apply_network
+ dsrc.ds_cfg["apply_network_config"] = apply_network
return dsrc
@@ -774,19 +917,18 @@ scbus-1 on xpt0 bus 0
data = {}
dsrc = self._get_ds(data)
self.m_is_platform_viable.return_value = False
- with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata, \
- mock.patch.object(dsrc, '_report_failure') as m_report_failure:
+ with mock.patch.object(
+ dsrc, "crawl_metadata"
+ ) as m_crawl_metadata, mock.patch.object(
+ dsrc, "_report_failure"
+ ) as m_report_failure:
ret = dsrc.get_data()
self.m_is_platform_viable.assert_called_with(dsrc.seed_dir)
self.assertFalse(ret)
# Assert that for non viable platforms,
# there is no communication with the Azure datasource.
- self.assertEqual(
- 0,
- m_crawl_metadata.call_count)
- self.assertEqual(
- 0,
- m_report_failure.call_count)
+ self.assertEqual(0, m_crawl_metadata.call_count)
+ self.assertEqual(0, m_report_failure.call_count)
def test_platform_viable_but_no_devs_should_return_no_datasource(self):
"""For platforms where the Azure platform is viable
@@ -797,170 +939,190 @@ scbus-1 on xpt0 bus 0
"""
data = {}
dsrc = self._get_ds(data)
- with mock.patch.object(dsrc, '_report_failure') as m_report_failure:
+ with mock.patch.object(dsrc, "_report_failure") as m_report_failure:
self.m_is_platform_viable.return_value = True
ret = dsrc.get_data()
self.m_is_platform_viable.assert_called_with(dsrc.seed_dir)
self.assertFalse(ret)
- self.assertEqual(
- 1,
- m_report_failure.call_count)
+ self.assertEqual(1, m_report_failure.call_count)
def test_crawl_metadata_exception_returns_no_datasource(self):
data = {}
dsrc = self._get_ds(data)
self.m_is_platform_viable.return_value = True
- with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata:
+ with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata:
m_crawl_metadata.side_effect = Exception
ret = dsrc.get_data()
self.m_is_platform_viable.assert_called_with(dsrc.seed_dir)
- self.assertEqual(
- 1,
- m_crawl_metadata.call_count)
+ self.assertEqual(1, m_crawl_metadata.call_count)
self.assertFalse(ret)
def test_crawl_metadata_exception_should_report_failure_with_msg(self):
data = {}
dsrc = self._get_ds(data)
self.m_is_platform_viable.return_value = True
- with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata, \
- mock.patch.object(dsrc, '_report_failure') as m_report_failure:
+ with mock.patch.object(
+ dsrc, "crawl_metadata"
+ ) as m_crawl_metadata, mock.patch.object(
+ dsrc, "_report_failure"
+ ) as m_report_failure:
m_crawl_metadata.side_effect = Exception
dsrc.get_data()
- self.assertEqual(
- 1,
- m_crawl_metadata.call_count)
+ self.assertEqual(1, m_crawl_metadata.call_count)
m_report_failure.assert_called_once_with(
- description=dsaz.DEFAULT_REPORT_FAILURE_USER_VISIBLE_MESSAGE)
+ description=dsaz.DEFAULT_REPORT_FAILURE_USER_VISIBLE_MESSAGE
+ )
def test_crawl_metadata_exc_should_log_could_not_crawl_msg(self):
data = {}
dsrc = self._get_ds(data)
self.m_is_platform_viable.return_value = True
- with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata:
+ with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata:
m_crawl_metadata.side_effect = Exception
dsrc.get_data()
- self.assertEqual(
- 1,
- m_crawl_metadata.call_count)
+ self.assertEqual(1, m_crawl_metadata.call_count)
self.assertIn(
- "Could not crawl Azure metadata",
- self.logs.getvalue())
+ "Could not crawl Azure metadata", self.logs.getvalue()
+ )
def test_basic_seed_dir(self):
- odata = {'HostName': "myhost", 'UserName': "myuser"}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': {}}
+ odata = {"HostName": "myhost", "UserName": "myuser"}
+ data = {
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": {},
+ }
dsrc = self._get_ds(data)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(dsrc.userdata_raw, "")
- self.assertEqual(dsrc.metadata['local-hostname'], odata['HostName'])
- self.assertTrue(os.path.isfile(
- os.path.join(self.waagent_d, 'ovf-env.xml')))
- self.assertEqual('azure', dsrc.cloud_name)
- self.assertEqual('azure', dsrc.platform_type)
+ self.assertEqual(dsrc.metadata["local-hostname"], odata["HostName"])
+ self.assertTrue(
+ os.path.isfile(os.path.join(self.waagent_d, "ovf-env.xml"))
+ )
+ self.assertEqual("azure", dsrc.cloud_name)
+ self.assertEqual("azure", dsrc.platform_type)
self.assertEqual(
- 'seed-dir (%s/seed/azure)' % self.tmp, dsrc.subplatform)
+ "seed-dir (%s/seed/azure)" % self.tmp, dsrc.subplatform
+ )
def test_basic_dev_file(self):
"""When a device path is used, present that in subplatform."""
- data = {'sys_cfg': {}, 'dsdevs': ['/dev/cd0']}
+ data = {"sys_cfg": {}, "dsdevs": ["/dev/cd0"]}
dsrc = self._get_ds(data)
# DSAzure will attempt to mount /dev/sr0 first, which should
# fail with mount error since the list of devices doesn't have
# /dev/sr0
- with mock.patch(MOCKPATH + 'util.mount_cb') as m_mount_cb:
+ with mock.patch(MOCKPATH + "util.mount_cb") as m_mount_cb:
m_mount_cb.side_effect = [
MountFailedError("fail"),
- ({'local-hostname': 'me'}, 'ud', {'cfg': ''}, {})
+ ({"local-hostname": "me"}, "ud", {"cfg": ""}, {}),
]
self.assertTrue(dsrc.get_data())
- self.assertEqual(dsrc.userdata_raw, 'ud')
- self.assertEqual(dsrc.metadata['local-hostname'], 'me')
- self.assertEqual('azure', dsrc.cloud_name)
- self.assertEqual('azure', dsrc.platform_type)
- self.assertEqual('config-disk (/dev/cd0)', dsrc.subplatform)
+ self.assertEqual(dsrc.userdata_raw, "ud")
+ self.assertEqual(dsrc.metadata["local-hostname"], "me")
+ self.assertEqual("azure", dsrc.cloud_name)
+ self.assertEqual("azure", dsrc.platform_type)
+ self.assertEqual("config-disk (/dev/cd0)", dsrc.subplatform)
def test_get_data_non_ubuntu_will_not_remove_network_scripts(self):
"""get_data on non-Ubuntu will not remove ubuntu net scripts."""
- odata = {'HostName': "myhost", 'UserName': "myuser"}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': {}}
+ odata = {"HostName": "myhost", "UserName": "myuser"}
+ data = {
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": {},
+ }
- dsrc = self._get_ds(data, distro='debian')
+ dsrc = self._get_ds(data, distro="debian")
dsrc.get_data()
self.m_remove_ubuntu_network_scripts.assert_not_called()
def test_get_data_on_ubuntu_will_remove_network_scripts(self):
"""get_data will remove ubuntu net scripts on Ubuntu distro."""
- sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
- odata = {'HostName': "myhost", 'UserName': "myuser"}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': sys_cfg}
+ sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
+ odata = {"HostName": "myhost", "UserName": "myuser"}
+ data = {
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": sys_cfg,
+ }
- dsrc = self._get_ds(data, distro='ubuntu')
+ dsrc = self._get_ds(data, distro="ubuntu")
dsrc.get_data()
self.m_remove_ubuntu_network_scripts.assert_called_once_with()
def test_get_data_on_ubuntu_will_not_remove_network_scripts_disabled(self):
"""When apply_network_config false, do not remove scripts on Ubuntu."""
- sys_cfg = {'datasource': {'Azure': {'apply_network_config': False}}}
- odata = {'HostName': "myhost", 'UserName': "myuser"}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': sys_cfg}
+ sys_cfg = {"datasource": {"Azure": {"apply_network_config": False}}}
+ odata = {"HostName": "myhost", "UserName": "myuser"}
+ data = {
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": sys_cfg,
+ }
- dsrc = self._get_ds(data, distro='ubuntu')
+ dsrc = self._get_ds(data, distro="ubuntu")
dsrc.get_data()
self.m_remove_ubuntu_network_scripts.assert_not_called()
def test_crawl_metadata_returns_structured_data_and_caches_nothing(self):
"""Return all structured metadata and cache no class attributes."""
yaml_cfg = ""
- odata = {'HostName': "myhost", 'UserName': "myuser",
- 'UserData': {'text': 'FOOBAR', 'encoding': 'plain'},
- 'dscfg': {'text': yaml_cfg, 'encoding': 'plain'}}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': {}}
+ odata = {
+ "HostName": "myhost",
+ "UserName": "myuser",
+ "UserData": {"text": "FOOBAR", "encoding": "plain"},
+ "dscfg": {"text": yaml_cfg, "encoding": "plain"},
+ }
+ data = {
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": {},
+ }
dsrc = self._get_ds(data)
expected_cfg = {
- 'PreprovisionedVMType': None,
- 'PreprovisionedVm': False,
- 'datasource': {'Azure': {}},
- 'system_info': {'default_user': {'name': 'myuser'}}}
+ "PreprovisionedVMType": None,
+ "PreprovisionedVm": False,
+ "datasource": {"Azure": {}},
+ "system_info": {"default_user": {"name": "myuser"}},
+ }
expected_metadata = {
- 'azure_data': {
- 'configurationsettype': 'LinuxProvisioningConfiguration'},
- 'imds': NETWORK_METADATA,
- 'instance-id': EXAMPLE_UUID,
- 'local-hostname': 'myhost',
- 'random_seed': 'wild'}
+ "azure_data": {
+ "configurationsettype": "LinuxProvisioningConfiguration"
+ },
+ "imds": NETWORK_METADATA,
+ "instance-id": EXAMPLE_UUID,
+ "local-hostname": "myhost",
+ "random_seed": "wild",
+ }
crawled_metadata = dsrc.crawl_metadata()
self.assertCountEqual(
crawled_metadata.keys(),
- ['cfg', 'files', 'metadata', 'userdata_raw'])
- self.assertEqual(crawled_metadata['cfg'], expected_cfg)
+ ["cfg", "files", "metadata", "userdata_raw"],
+ )
+ self.assertEqual(crawled_metadata["cfg"], expected_cfg)
self.assertEqual(
- list(crawled_metadata['files'].keys()), ['ovf-env.xml'])
+ list(crawled_metadata["files"].keys()), ["ovf-env.xml"]
+ )
self.assertIn(
- b'<HostName>myhost</HostName>',
- crawled_metadata['files']['ovf-env.xml'])
- self.assertEqual(crawled_metadata['metadata'], expected_metadata)
- self.assertEqual(crawled_metadata['userdata_raw'], 'FOOBAR')
+ b"<HostName>myhost</HostName>",
+ crawled_metadata["files"]["ovf-env.xml"],
+ )
+ self.assertEqual(crawled_metadata["metadata"], expected_metadata)
+ self.assertEqual(crawled_metadata["userdata_raw"], "FOOBAR")
self.assertEqual(dsrc.userdata_raw, None)
self.assertEqual(dsrc.metadata, {})
self.assertEqual(dsrc._metadata_imds, UNSET)
- self.assertFalse(os.path.isfile(
- os.path.join(self.waagent_d, 'ovf-env.xml')))
+ self.assertFalse(
+ os.path.isfile(os.path.join(self.waagent_d, "ovf-env.xml"))
+ )
def test_crawl_metadata_raises_invalid_metadata_on_error(self):
"""crawl_metadata raises an exception on invalid ovf-env.xml."""
- data = {'ovfcontent': "BOGUS", 'sys_cfg': {}}
+ data = {"ovfcontent": "BOGUS", "sys_cfg": {}}
dsrc = self._get_ds(data)
- error_msg = ('BrokenAzureDataSource: Invalid ovf-env.xml:'
- ' syntax error: line 1, column 0')
+ error_msg = (
+ "BrokenAzureDataSource: Invalid ovf-env.xml:"
+ " syntax error: line 1, column 0"
+ )
with self.assertRaises(InvalidMetaDataException) as cm:
dsrc.crawl_metadata()
self.assertEqual(str(cm.exception), error_msg)
@@ -971,20 +1133,19 @@ scbus-1 on xpt0 bus 0
platform_settings={"PreprovisionedVm": "False"}
)
- data = {
- 'ovfcontent': ovfenv,
- 'sys_cfg': {}
- }
+ data = {"ovfcontent": ovfenv, "sys_cfg": {}}
dsrc = self._get_ds(data)
dsrc.crawl_metadata()
self.assertEqual(1, self.m_get_metadata_from_imds.call_count)
@mock.patch(
- 'cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting')
- @mock.patch('cloudinit.sources.DataSourceAzure.util.write_file')
+ "cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting"
+ )
+ @mock.patch("cloudinit.sources.DataSourceAzure.util.write_file")
@mock.patch(
- 'cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready')
- @mock.patch('cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds')
+ "cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready"
+ )
+ @mock.patch("cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds")
def test_crawl_metadata_call_imds_twice_with_reprovision(
self, poll_imds_func, m_report_ready, m_write, m_dhcp
):
@@ -993,21 +1154,20 @@ scbus-1 on xpt0 bus 0
platform_settings={"PreprovisionedVm": "True"}
)
- data = {
- 'ovfcontent': ovfenv,
- 'sys_cfg': {}
- }
+ data = {"ovfcontent": ovfenv, "sys_cfg": {}}
dsrc = self._get_ds(data)
poll_imds_func.return_value = ovfenv
dsrc.crawl_metadata()
self.assertEqual(2, self.m_get_metadata_from_imds.call_count)
@mock.patch(
- 'cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting')
- @mock.patch('cloudinit.sources.DataSourceAzure.util.write_file')
+ "cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting"
+ )
+ @mock.patch("cloudinit.sources.DataSourceAzure.util.write_file")
@mock.patch(
- 'cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready')
- @mock.patch('cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds')
+ "cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready"
+ )
+ @mock.patch("cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds")
def test_crawl_metadata_on_reprovision_reports_ready(
self, poll_imds_func, m_report_ready, m_write, m_dhcp
):
@@ -1016,37 +1176,36 @@ scbus-1 on xpt0 bus 0
platform_settings={"PreprovisionedVm": "True"}
)
- data = {
- 'ovfcontent': ovfenv,
- 'sys_cfg': {}
- }
+ data = {"ovfcontent": ovfenv, "sys_cfg": {}}
dsrc = self._get_ds(data)
poll_imds_func.return_value = ovfenv
dsrc.crawl_metadata()
self.assertEqual(1, m_report_ready.call_count)
@mock.patch(
- 'cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting')
- @mock.patch('cloudinit.sources.DataSourceAzure.util.write_file')
+ "cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting"
+ )
+ @mock.patch("cloudinit.sources.DataSourceAzure.util.write_file")
@mock.patch(
- 'cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready')
- @mock.patch('cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds')
+ "cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready"
+ )
+ @mock.patch("cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds")
@mock.patch(
- 'cloudinit.sources.DataSourceAzure.DataSourceAzure.'
- '_wait_for_all_nics_ready')
+ "cloudinit.sources.DataSourceAzure.DataSourceAzure."
+ "_wait_for_all_nics_ready"
+ )
def test_crawl_metadata_waits_for_nic_on_savable_vms(
self, detect_nics, poll_imds_func, report_ready_func, m_write, m_dhcp
):
"""If reprovisioning, report ready at the end"""
ovfenv = construct_valid_ovf_env(
- platform_settings={"PreprovisionedVMType": "Savable",
- "PreprovisionedVm": "True"}
+ platform_settings={
+ "PreprovisionedVMType": "Savable",
+ "PreprovisionedVm": "True",
+ }
)
- data = {
- 'ovfcontent': ovfenv,
- 'sys_cfg': {}
- }
+ data = {"ovfcontent": ovfenv, "sys_cfg": {}}
dsrc = self._get_ds(data)
poll_imds_func.return_value = ovfenv
dsrc.crawl_metadata()
@@ -1054,18 +1213,27 @@ scbus-1 on xpt0 bus 0
self.assertEqual(1, detect_nics.call_count)
@mock.patch(
- 'cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting')
- @mock.patch('cloudinit.sources.DataSourceAzure.util.write_file')
+ "cloudinit.sources.DataSourceAzure.EphemeralDHCPv4WithReporting"
+ )
+ @mock.patch("cloudinit.sources.DataSourceAzure.util.write_file")
@mock.patch(
- 'cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready')
- @mock.patch('cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds')
+ "cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready"
+ )
+ @mock.patch("cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds")
@mock.patch(
- 'cloudinit.sources.DataSourceAzure.DataSourceAzure.'
- '_wait_for_all_nics_ready')
- @mock.patch('os.path.isfile')
+ "cloudinit.sources.DataSourceAzure.DataSourceAzure."
+ "_wait_for_all_nics_ready"
+ )
+ @mock.patch("os.path.isfile")
def test_detect_nics_when_marker_present(
- self, is_file, detect_nics, poll_imds_func, report_ready_func, m_write,
- m_dhcp):
+ self,
+ is_file,
+ detect_nics,
+ poll_imds_func,
+ report_ready_func,
+ m_write,
+ m_dhcp,
+ ):
"""If reprovisioning, wait for nic attach if marker present"""
def is_file_ret(key):
@@ -1074,10 +1242,7 @@ scbus-1 on xpt0 bus 0
is_file.side_effect = is_file_ret
ovfenv = construct_valid_ovf_env()
- data = {
- 'ovfcontent': ovfenv,
- 'sys_cfg': {}
- }
+ data = {"ovfcontent": ovfenv, "sys_cfg": {}}
dsrc = self._get_ds(data)
poll_imds_func.return_value = ovfenv
@@ -1085,29 +1250,28 @@ scbus-1 on xpt0 bus 0
self.assertEqual(1, report_ready_func.call_count)
self.assertEqual(1, detect_nics.call_count)
- @mock.patch('cloudinit.sources.DataSourceAzure.util.write_file')
- @mock.patch('cloudinit.sources.helpers.netlink.'
- 'wait_for_media_disconnect_connect')
+ @mock.patch("cloudinit.sources.DataSourceAzure.util.write_file")
@mock.patch(
- 'cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready')
- @mock.patch('cloudinit.sources.DataSourceAzure.readurl')
+ "cloudinit.sources.helpers.netlink.wait_for_media_disconnect_connect"
+ )
+ @mock.patch(
+ "cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready"
+ )
+ @mock.patch("cloudinit.sources.DataSourceAzure.readurl")
def test_crawl_metadata_on_reprovision_reports_ready_using_lease(
- self, m_readurl, m_report_ready,
- m_media_switch, m_write
+ self, m_readurl, m_report_ready, m_media_switch, m_write
):
"""If reprovisioning, report ready using the obtained lease"""
ovfenv = construct_valid_ovf_env(
platform_settings={"PreprovisionedVm": "True"}
)
- data = {
- 'ovfcontent': ovfenv,
- 'sys_cfg': {}
- }
+ data = {"ovfcontent": ovfenv, "sys_cfg": {}}
dsrc = self._get_ds(data)
- with mock.patch.object(dsrc.distro.networking, 'is_up') \
- as m_dsrc_distro_networking_is_up:
+ with mock.patch.object(
+ dsrc.distro.networking, "is_up"
+ ) as m_dsrc_distro_networking_is_up:
# For this mock, net should not be up,
# so that cached ephemeral won't be used.
@@ -1116,16 +1280,21 @@ scbus-1 on xpt0 bus 0
m_dsrc_distro_networking_is_up.return_value = False
lease = {
- 'interface': 'eth9', 'fixed-address': '192.168.2.9',
- 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0',
- 'unknown-245': '624c3620'}
- self.m_ephemeral_dhcpv4_with_reporting.return_value \
- .__enter__.return_value = lease
+ "interface": "eth9",
+ "fixed-address": "192.168.2.9",
+ "routers": "192.168.2.1",
+ "subnet-mask": "255.255.255.0",
+ "unknown-245": "624c3620",
+ }
+ self.m_ephemeral_dhcpv4_with_reporting.return_value.__enter__.return_value = ( # noqa: E501
+ lease
+ )
m_media_switch.return_value = None
reprovision_ovfenv = construct_valid_ovf_env()
m_readurl.return_value = url_helper.StringResponse(
- reprovision_ovfenv.encode('utf-8'))
+ reprovision_ovfenv.encode("utf-8")
+ )
dsrc.crawl_metadata()
self.assertEqual(2, m_report_ready.call_count)
@@ -1133,91 +1302,118 @@ scbus-1 on xpt0 bus 0
def test_waagent_d_has_0700_perms(self):
# we expect /var/lib/waagent to be created 0700
- dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()})
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertTrue(os.path.isdir(self.waagent_d))
self.assertEqual(stat.S_IMODE(os.stat(self.waagent_d).st_mode), 0o700)
- @mock.patch('cloudinit.sources.DataSourceAzure.device_driver',
- return_value=None)
+ @mock.patch(
+ "cloudinit.sources.DataSourceAzure.device_driver", return_value=None
+ )
def test_network_config_set_from_imds(self, m_driver):
"""Datasource.network_config returns IMDS network data."""
- sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
+ sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
odata = {}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': sys_cfg}
+ data = {
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": sys_cfg,
+ }
expected_network_config = {
- 'ethernets': {
- 'eth0': {'set-name': 'eth0',
- 'match': {'macaddress': '00:0d:3a:04:75:98'},
- 'dhcp6': False,
- 'dhcp4': True,
- 'dhcp4-overrides': {'route-metric': 100}}},
- 'version': 2}
+ "ethernets": {
+ "eth0": {
+ "set-name": "eth0",
+ "match": {"macaddress": "00:0d:3a:04:75:98"},
+ "dhcp6": False,
+ "dhcp4": True,
+ "dhcp4-overrides": {"route-metric": 100},
+ }
+ },
+ "version": 2,
+ }
dsrc = self._get_ds(data)
dsrc.get_data()
self.assertEqual(expected_network_config, dsrc.network_config)
- @mock.patch('cloudinit.sources.DataSourceAzure.device_driver',
- return_value=None)
+ @mock.patch(
+ "cloudinit.sources.DataSourceAzure.device_driver", return_value=None
+ )
def test_network_config_set_from_imds_route_metric_for_secondary_nic(
- self, m_driver):
+ self, m_driver
+ ):
"""Datasource.network_config adds route-metric to secondary nics."""
- sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
+ sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
odata = {}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': sys_cfg}
+ data = {
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": sys_cfg,
+ }
expected_network_config = {
- 'ethernets': {
- 'eth0': {'set-name': 'eth0',
- 'match': {'macaddress': '00:0d:3a:04:75:98'},
- 'dhcp6': False,
- 'dhcp4': True,
- 'dhcp4-overrides': {'route-metric': 100}},
- 'eth1': {'set-name': 'eth1',
- 'match': {'macaddress': '22:0d:3a:04:75:98'},
- 'dhcp6': False,
- 'dhcp4': True,
- 'dhcp4-overrides': {'route-metric': 200}},
- 'eth2': {'set-name': 'eth2',
- 'match': {'macaddress': '33:0d:3a:04:75:98'},
- 'dhcp6': False,
- 'dhcp4': True,
- 'dhcp4-overrides': {'route-metric': 300}}},
- 'version': 2}
+ "ethernets": {
+ "eth0": {
+ "set-name": "eth0",
+ "match": {"macaddress": "00:0d:3a:04:75:98"},
+ "dhcp6": False,
+ "dhcp4": True,
+ "dhcp4-overrides": {"route-metric": 100},
+ },
+ "eth1": {
+ "set-name": "eth1",
+ "match": {"macaddress": "22:0d:3a:04:75:98"},
+ "dhcp6": False,
+ "dhcp4": True,
+ "dhcp4-overrides": {"route-metric": 200},
+ },
+ "eth2": {
+ "set-name": "eth2",
+ "match": {"macaddress": "33:0d:3a:04:75:98"},
+ "dhcp6": False,
+ "dhcp4": True,
+ "dhcp4-overrides": {"route-metric": 300},
+ },
+ },
+ "version": 2,
+ }
imds_data = copy.deepcopy(NETWORK_METADATA)
- imds_data['network']['interface'].append(SECONDARY_INTERFACE)
+ imds_data["network"]["interface"].append(SECONDARY_INTERFACE)
third_intf = copy.deepcopy(SECONDARY_INTERFACE)
- third_intf['macAddress'] = third_intf['macAddress'].replace('22', '33')
- third_intf['ipv4']['subnet'][0]['address'] = '10.0.2.0'
- third_intf['ipv4']['ipAddress'][0]['privateIpAddress'] = '10.0.2.6'
- imds_data['network']['interface'].append(third_intf)
+ third_intf["macAddress"] = third_intf["macAddress"].replace("22", "33")
+ third_intf["ipv4"]["subnet"][0]["address"] = "10.0.2.0"
+ third_intf["ipv4"]["ipAddress"][0]["privateIpAddress"] = "10.0.2.6"
+ imds_data["network"]["interface"].append(third_intf)
self.m_get_metadata_from_imds.return_value = imds_data
dsrc = self._get_ds(data)
dsrc.get_data()
self.assertEqual(expected_network_config, dsrc.network_config)
- @mock.patch('cloudinit.sources.DataSourceAzure.device_driver',
- return_value=None)
+ @mock.patch(
+ "cloudinit.sources.DataSourceAzure.device_driver", return_value=None
+ )
def test_network_config_set_from_imds_for_secondary_nic_no_ip(
- self, m_driver):
+ self, m_driver
+ ):
"""If an IP address is empty then there should no config for it."""
- sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
+ sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
odata = {}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': sys_cfg}
+ data = {
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": sys_cfg,
+ }
expected_network_config = {
- 'ethernets': {
- 'eth0': {'set-name': 'eth0',
- 'match': {'macaddress': '00:0d:3a:04:75:98'},
- 'dhcp6': False,
- 'dhcp4': True,
- 'dhcp4-overrides': {'route-metric': 100}}},
- 'version': 2}
+ "ethernets": {
+ "eth0": {
+ "set-name": "eth0",
+ "match": {"macaddress": "00:0d:3a:04:75:98"},
+ "dhcp6": False,
+ "dhcp4": True,
+ "dhcp4-overrides": {"route-metric": 100},
+ }
+ },
+ "version": 2,
+ }
imds_data = copy.deepcopy(NETWORK_METADATA)
- imds_data['network']['interface'].append(SECONDARY_INTERFACE_NO_IP)
+ imds_data["network"]["interface"].append(SECONDARY_INTERFACE_NO_IP)
self.m_get_metadata_from_imds.return_value = imds_data
dsrc = self._get_ds(data)
dsrc.get_data()
@@ -1225,91 +1421,110 @@ scbus-1 on xpt0 bus 0
def test_availability_zone_set_from_imds(self):
"""Datasource.availability returns IMDS platformFaultDomain."""
- sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
+ sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
odata = {}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': sys_cfg}
+ data = {
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": sys_cfg,
+ }
dsrc = self._get_ds(data)
dsrc.get_data()
- self.assertEqual('0', dsrc.availability_zone)
+ self.assertEqual("0", dsrc.availability_zone)
def test_region_set_from_imds(self):
"""Datasource.region returns IMDS region location."""
- sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
+ sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
odata = {}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': sys_cfg}
+ data = {
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": sys_cfg,
+ }
dsrc = self._get_ds(data)
dsrc.get_data()
- self.assertEqual('eastus2', dsrc.region)
+ self.assertEqual("eastus2", dsrc.region)
def test_sys_cfg_set_never_destroy_ntfs(self):
- sys_cfg = {'datasource': {'Azure': {
- 'never_destroy_ntfs': 'user-supplied-value'}}}
- data = {'ovfcontent': construct_valid_ovf_env(data={}),
- 'sys_cfg': sys_cfg}
+ sys_cfg = {
+ "datasource": {
+ "Azure": {"never_destroy_ntfs": "user-supplied-value"}
+ }
+ }
+ data = {
+ "ovfcontent": construct_valid_ovf_env(data={}),
+ "sys_cfg": sys_cfg,
+ }
dsrc = self._get_ds(data)
ret = self._get_and_setup(dsrc)
self.assertTrue(ret)
- self.assertEqual(dsrc.ds_cfg.get(dsaz.DS_CFG_KEY_PRESERVE_NTFS),
- 'user-supplied-value')
+ self.assertEqual(
+ dsrc.ds_cfg.get(dsaz.DS_CFG_KEY_PRESERVE_NTFS),
+ "user-supplied-value",
+ )
def test_username_used(self):
- odata = {'HostName': "myhost", 'UserName': "myuser"}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+ odata = {"HostName": "myhost", "UserName": "myuser"}
+ data = {"ovfcontent": construct_valid_ovf_env(data=odata)}
dsrc = self._get_ds(data)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(dsrc.cfg['system_info']['default_user']['name'],
- "myuser")
+ self.assertEqual(
+ dsrc.cfg["system_info"]["default_user"]["name"], "myuser"
+ )
def test_password_given(self):
- odata = {'HostName': "myhost", 'UserName': "myuser",
- 'UserPassword': "mypass"}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+ odata = {
+ "HostName": "myhost",
+ "UserName": "myuser",
+ "UserPassword": "mypass",
+ }
+ data = {"ovfcontent": construct_valid_ovf_env(data=odata)}
dsrc = self._get_ds(data)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertIn('default_user', dsrc.cfg['system_info'])
- defuser = dsrc.cfg['system_info']['default_user']
+ self.assertIn("default_user", dsrc.cfg["system_info"])
+ defuser = dsrc.cfg["system_info"]["default_user"]
# default user should be updated username and should not be locked.
- self.assertEqual(defuser['name'], odata['UserName'])
- self.assertFalse(defuser['lock_passwd'])
+ self.assertEqual(defuser["name"], odata["UserName"])
+ self.assertFalse(defuser["lock_passwd"])
# passwd is crypt formated string $id$salt$encrypted
# encrypting plaintext with salt value of everything up to final '$'
# should equal that after the '$'
- pos = defuser['passwd'].rfind("$") + 1
- self.assertEqual(defuser['passwd'],
- crypt.crypt(odata['UserPassword'],
- defuser['passwd'][0:pos]))
+ pos = defuser["passwd"].rfind("$") + 1
+ self.assertEqual(
+ defuser["passwd"],
+ crypt.crypt(odata["UserPassword"], defuser["passwd"][0:pos]),
+ )
# the same hashed value should also be present in cfg['password']
- self.assertEqual(defuser['passwd'], dsrc.cfg['password'])
+ self.assertEqual(defuser["passwd"], dsrc.cfg["password"])
def test_user_not_locked_if_password_redacted(self):
- odata = {'HostName': "myhost", 'UserName': "myuser",
- 'UserPassword': dsaz.DEF_PASSWD_REDACTION}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+ odata = {
+ "HostName": "myhost",
+ "UserName": "myuser",
+ "UserPassword": dsaz.DEF_PASSWD_REDACTION,
+ }
+ data = {"ovfcontent": construct_valid_ovf_env(data=odata)}
dsrc = self._get_ds(data)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertIn('default_user', dsrc.cfg['system_info'])
- defuser = dsrc.cfg['system_info']['default_user']
+ self.assertIn("default_user", dsrc.cfg["system_info"])
+ defuser = dsrc.cfg["system_info"]["default_user"]
# default user should be updated username and should not be locked.
- self.assertEqual(defuser['name'], odata['UserName'])
- self.assertIn('lock_passwd', defuser)
- self.assertFalse(defuser['lock_passwd'])
+ self.assertEqual(defuser["name"], odata["UserName"])
+ self.assertIn("lock_passwd", defuser)
+ self.assertFalse(defuser["lock_passwd"])
def test_userdata_plain(self):
mydata = "FOOBAR"
- odata = {'UserData': {'text': mydata, 'encoding': 'plain'}}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+ odata = {"UserData": {"text": mydata, "encoding": "plain"}}
+ data = {"ovfcontent": construct_valid_ovf_env(data=odata)}
dsrc = self._get_ds(data)
ret = dsrc.get_data()
@@ -1318,72 +1533,86 @@ scbus-1 on xpt0 bus 0
def test_userdata_found(self):
mydata = "FOOBAR"
- odata = {'UserData': {'text': b64e(mydata), 'encoding': 'base64'}}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+ odata = {"UserData": {"text": b64e(mydata), "encoding": "base64"}}
+ data = {"ovfcontent": construct_valid_ovf_env(data=odata)}
dsrc = self._get_ds(data)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(dsrc.userdata_raw, mydata.encode('utf-8'))
+ self.assertEqual(dsrc.userdata_raw, mydata.encode("utf-8"))
def test_default_ephemeral_configs_ephemeral_exists(self):
# make sure the ephemeral configs are correct if disk present
odata = {}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': {}}
+ data = {
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": {},
+ }
orig_exists = dsaz.os.path.exists
def changed_exists(path):
- return True if path == dsaz.RESOURCE_DISK_PATH else orig_exists(
- path)
+ return (
+ True if path == dsaz.RESOURCE_DISK_PATH else orig_exists(path)
+ )
- with mock.patch(MOCKPATH + 'os.path.exists', new=changed_exists):
+ with mock.patch(MOCKPATH + "os.path.exists", new=changed_exists):
dsrc = self._get_ds(data)
ret = dsrc.get_data()
self.assertTrue(ret)
cfg = dsrc.get_config_obj()
- self.assertEqual(dsrc.device_name_to_device("ephemeral0"),
- dsaz.RESOURCE_DISK_PATH)
- assert 'disk_setup' in cfg
- assert 'fs_setup' in cfg
- self.assertIsInstance(cfg['disk_setup'], dict)
- self.assertIsInstance(cfg['fs_setup'], list)
+ self.assertEqual(
+ dsrc.device_name_to_device("ephemeral0"),
+ dsaz.RESOURCE_DISK_PATH,
+ )
+ assert "disk_setup" in cfg
+ assert "fs_setup" in cfg
+ self.assertIsInstance(cfg["disk_setup"], dict)
+ self.assertIsInstance(cfg["fs_setup"], list)
def test_default_ephemeral_configs_ephemeral_does_not_exist(self):
# make sure the ephemeral configs are correct if disk not present
odata = {}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': {}}
+ data = {
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": {},
+ }
orig_exists = dsaz.os.path.exists
def changed_exists(path):
- return False if path == dsaz.RESOURCE_DISK_PATH else orig_exists(
- path)
+ return (
+ False if path == dsaz.RESOURCE_DISK_PATH else orig_exists(path)
+ )
- with mock.patch(MOCKPATH + 'os.path.exists', new=changed_exists):
+ with mock.patch(MOCKPATH + "os.path.exists", new=changed_exists):
dsrc = self._get_ds(data)
ret = dsrc.get_data()
self.assertTrue(ret)
cfg = dsrc.get_config_obj()
- assert 'disk_setup' not in cfg
- assert 'fs_setup' not in cfg
+ assert "disk_setup" not in cfg
+ assert "fs_setup" not in cfg
def test_provide_disk_aliases(self):
# Make sure that user can affect disk aliases
- dscfg = {'disk_aliases': {'ephemeral0': '/dev/sdc'}}
- odata = {'HostName': "myhost", 'UserName': "myuser",
- 'dscfg': {'text': b64e(yaml.dump(dscfg)),
- 'encoding': 'base64'}}
- usercfg = {'disk_setup': {'/dev/sdc': {'something': '...'},
- 'ephemeral0': False}}
- userdata = '#cloud-config' + yaml.dump(usercfg) + "\n"
+ dscfg = {"disk_aliases": {"ephemeral0": "/dev/sdc"}}
+ odata = {
+ "HostName": "myhost",
+ "UserName": "myuser",
+ "dscfg": {"text": b64e(yaml.dump(dscfg)), "encoding": "base64"},
+ }
+ usercfg = {
+ "disk_setup": {
+ "/dev/sdc": {"something": "..."},
+ "ephemeral0": False,
+ }
+ }
+ userdata = "#cloud-config" + yaml.dump(usercfg) + "\n"
ovfcontent = construct_valid_ovf_env(data=odata, userdata=userdata)
- data = {'ovfcontent': ovfcontent, 'sys_cfg': {}}
+ data = {"ovfcontent": ovfcontent, "sys_cfg": {}}
dsrc = self._get_ds(data)
ret = dsrc.get_data()
@@ -1394,92 +1623,95 @@ scbus-1 on xpt0 bus 0
def test_userdata_arrives(self):
userdata = "This is my user-data"
xml = construct_valid_ovf_env(data={}, userdata=userdata)
- data = {'ovfcontent': xml}
+ data = {"ovfcontent": xml}
dsrc = self._get_ds(data)
dsrc.get_data()
- self.assertEqual(userdata.encode('us-ascii'), dsrc.userdata_raw)
+ self.assertEqual(userdata.encode("us-ascii"), dsrc.userdata_raw)
def test_password_redacted_in_ovf(self):
- odata = {'HostName': "myhost", 'UserName': "myuser",
- 'UserPassword': "mypass"}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+ odata = {
+ "HostName": "myhost",
+ "UserName": "myuser",
+ "UserPassword": "mypass",
+ }
+ data = {"ovfcontent": construct_valid_ovf_env(data=odata)}
dsrc = self._get_ds(data)
ret = dsrc.get_data()
self.assertTrue(ret)
- ovf_env_path = os.path.join(self.waagent_d, 'ovf-env.xml')
+ ovf_env_path = os.path.join(self.waagent_d, "ovf-env.xml")
# The XML should not be same since the user password is redacted
on_disk_ovf = load_file(ovf_env_path)
- self.xml_notequals(data['ovfcontent'], on_disk_ovf)
+ self.xml_notequals(data["ovfcontent"], on_disk_ovf)
# Make sure that the redacted password on disk is not used by CI
- self.assertNotEqual(dsrc.cfg.get('password'),
- dsaz.DEF_PASSWD_REDACTION)
+ self.assertNotEqual(
+ dsrc.cfg.get("password"), dsaz.DEF_PASSWD_REDACTION
+ )
# Make sure that the password was really encrypted
et = ET.fromstring(on_disk_ovf)
for elem in et.iter():
- if 'UserPassword' in elem.tag:
+ if "UserPassword" in elem.tag:
self.assertEqual(dsaz.DEF_PASSWD_REDACTION, elem.text)
def test_ovf_env_arrives_in_waagent_dir(self):
xml = construct_valid_ovf_env(data={}, userdata="FOODATA")
- dsrc = self._get_ds({'ovfcontent': xml})
+ dsrc = self._get_ds({"ovfcontent": xml})
dsrc.get_data()
# 'data_dir' is '/var/lib/waagent' (walinux-agent's state dir)
# we expect that the ovf-env.xml file is copied there.
- ovf_env_path = os.path.join(self.waagent_d, 'ovf-env.xml')
+ ovf_env_path = os.path.join(self.waagent_d, "ovf-env.xml")
self.assertTrue(os.path.exists(ovf_env_path))
self.xml_equals(xml, load_file(ovf_env_path))
def test_ovf_can_include_unicode(self):
xml = construct_valid_ovf_env(data={})
- xml = '\ufeff{0}'.format(xml)
- dsrc = self._get_ds({'ovfcontent': xml})
+ xml = "\ufeff{0}".format(xml)
+ dsrc = self._get_ds({"ovfcontent": xml})
dsrc.get_data()
- def test_dsaz_report_ready_returns_true_when_report_succeeds(
- self):
- dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ def test_dsaz_report_ready_returns_true_when_report_succeeds(self):
+ dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()})
self.assertTrue(dsrc._report_ready(lease=mock.MagicMock()))
- def test_dsaz_report_ready_returns_false_and_does_not_propagate_exc(
- self):
- dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ def test_dsaz_report_ready_returns_false_and_does_not_propagate_exc(self):
+ dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()})
self.m_get_metadata_from_fabric.side_effect = Exception
self.assertFalse(dsrc._report_ready(lease=mock.MagicMock()))
def test_dsaz_report_failure_returns_true_when_report_succeeds(self):
- dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()})
- with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata:
+ with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata:
# mock crawl metadata failure to cause report failure
m_crawl_metadata.side_effect = Exception
self.assertTrue(dsrc._report_failure())
- self.assertEqual(
- 1,
- self.m_report_failure_to_fabric.call_count)
+ self.assertEqual(1, self.m_report_failure_to_fabric.call_count)
def test_dsaz_report_failure_returns_false_and_does_not_propagate_exc(
- self):
- dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
-
- with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata, \
- mock.patch.object(dsrc, '_ephemeral_dhcp_ctx') \
- as m_ephemeral_dhcp_ctx, \
- mock.patch.object(dsrc.distro.networking, 'is_up') \
- as m_dsrc_distro_networking_is_up:
+ self,
+ ):
+ dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()})
+
+ with mock.patch.object(
+ dsrc, "crawl_metadata"
+ ) as m_crawl_metadata, mock.patch.object(
+ dsrc, "_ephemeral_dhcp_ctx"
+ ) as m_ephemeral_dhcp_ctx, mock.patch.object(
+ dsrc.distro.networking, "is_up"
+ ) as m_dsrc_distro_networking_is_up:
# mock crawl metadata failure to cause report failure
m_crawl_metadata.side_effect = Exception
# setup mocks to allow using cached ephemeral dhcp lease
m_dsrc_distro_networking_is_up.return_value = True
- test_lease_dhcp_option_245 = 'test_lease_dhcp_option_245'
- test_lease = {'unknown-245': test_lease_dhcp_option_245}
+ test_lease_dhcp_option_245 = "test_lease_dhcp_option_245"
+ test_lease = {"unknown-245": test_lease_dhcp_option_245}
m_ephemeral_dhcp_ctx.lease = test_lease
# We expect 3 calls to report_failure_to_fabric,
@@ -1490,91 +1722,97 @@ scbus-1 on xpt0 bus 0
# 3. Using fallback lease to report failure to Azure
self.m_report_failure_to_fabric.side_effect = Exception
self.assertFalse(dsrc._report_failure())
- self.assertEqual(
- 3,
- self.m_report_failure_to_fabric.call_count)
+ self.assertEqual(3, self.m_report_failure_to_fabric.call_count)
def test_dsaz_report_failure_description_msg(self):
- dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()})
- with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata:
+ with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata:
# mock crawl metadata failure to cause report failure
m_crawl_metadata.side_effect = Exception
- test_msg = 'Test report failure description message'
+ test_msg = "Test report failure description message"
self.assertTrue(dsrc._report_failure(description=test_msg))
self.m_report_failure_to_fabric.assert_called_once_with(
- dhcp_opts=mock.ANY, description=test_msg)
+ dhcp_opts=mock.ANY, description=test_msg
+ )
def test_dsaz_report_failure_no_description_msg(self):
- dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()})
- with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata:
+ with mock.patch.object(dsrc, "crawl_metadata") as m_crawl_metadata:
m_crawl_metadata.side_effect = Exception
self.assertTrue(dsrc._report_failure()) # no description msg
self.m_report_failure_to_fabric.assert_called_once_with(
- dhcp_opts=mock.ANY, description=None)
+ dhcp_opts=mock.ANY, description=None
+ )
def test_dsaz_report_failure_uses_cached_ephemeral_dhcp_ctx_lease(self):
- dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
-
- with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata, \
- mock.patch.object(dsrc, '_ephemeral_dhcp_ctx') \
- as m_ephemeral_dhcp_ctx, \
- mock.patch.object(dsrc.distro.networking, 'is_up') \
- as m_dsrc_distro_networking_is_up:
+ dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()})
+
+ with mock.patch.object(
+ dsrc, "crawl_metadata"
+ ) as m_crawl_metadata, mock.patch.object(
+ dsrc, "_ephemeral_dhcp_ctx"
+ ) as m_ephemeral_dhcp_ctx, mock.patch.object(
+ dsrc.distro.networking, "is_up"
+ ) as m_dsrc_distro_networking_is_up:
# mock crawl metadata failure to cause report failure
m_crawl_metadata.side_effect = Exception
# setup mocks to allow using cached ephemeral dhcp lease
m_dsrc_distro_networking_is_up.return_value = True
- test_lease_dhcp_option_245 = 'test_lease_dhcp_option_245'
- test_lease = {'unknown-245': test_lease_dhcp_option_245}
+ test_lease_dhcp_option_245 = "test_lease_dhcp_option_245"
+ test_lease = {"unknown-245": test_lease_dhcp_option_245}
m_ephemeral_dhcp_ctx.lease = test_lease
self.assertTrue(dsrc._report_failure())
# ensure called with cached ephemeral dhcp lease option 245
self.m_report_failure_to_fabric.assert_called_once_with(
- description=mock.ANY, dhcp_opts=test_lease_dhcp_option_245)
+ description=mock.ANY, dhcp_opts=test_lease_dhcp_option_245
+ )
# ensure cached ephemeral is cleaned
- self.assertEqual(
- 1,
- m_ephemeral_dhcp_ctx.clean_network.call_count)
+ self.assertEqual(1, m_ephemeral_dhcp_ctx.clean_network.call_count)
def test_dsaz_report_failure_no_net_uses_new_ephemeral_dhcp_lease(self):
- dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()})
- with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata, \
- mock.patch.object(dsrc.distro.networking, 'is_up') \
- as m_dsrc_distro_networking_is_up:
+ with mock.patch.object(
+ dsrc, "crawl_metadata"
+ ) as m_crawl_metadata, mock.patch.object(
+ dsrc.distro.networking, "is_up"
+ ) as m_dsrc_distro_networking_is_up:
# mock crawl metadata failure to cause report failure
m_crawl_metadata.side_effect = Exception
# net is not up and cannot use cached ephemeral dhcp
m_dsrc_distro_networking_is_up.return_value = False
# setup ephemeral dhcp lease discovery mock
- test_lease_dhcp_option_245 = 'test_lease_dhcp_option_245'
- test_lease = {'unknown-245': test_lease_dhcp_option_245}
- self.m_ephemeral_dhcpv4_with_reporting.return_value \
- .__enter__.return_value = test_lease
+ test_lease_dhcp_option_245 = "test_lease_dhcp_option_245"
+ test_lease = {"unknown-245": test_lease_dhcp_option_245}
+ self.m_ephemeral_dhcpv4_with_reporting.return_value.__enter__.return_value = ( # noqa: E501
+ test_lease
+ )
self.assertTrue(dsrc._report_failure())
# ensure called with the newly discovered
# ephemeral dhcp lease option 245
self.m_report_failure_to_fabric.assert_called_once_with(
- description=mock.ANY, dhcp_opts=test_lease_dhcp_option_245)
+ description=mock.ANY, dhcp_opts=test_lease_dhcp_option_245
+ )
- def test_dsaz_report_failure_no_net_and_no_dhcp_uses_fallback_lease(
- self):
- dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ def test_dsaz_report_failure_no_net_and_no_dhcp_uses_fallback_lease(self):
+ dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()})
- with mock.patch.object(dsrc, 'crawl_metadata') as m_crawl_metadata, \
- mock.patch.object(dsrc.distro.networking, 'is_up') \
- as m_dsrc_distro_networking_is_up:
+ with mock.patch.object(
+ dsrc, "crawl_metadata"
+ ) as m_crawl_metadata, mock.patch.object(
+ dsrc.distro.networking, "is_up"
+ ) as m_dsrc_distro_networking_is_up:
# mock crawl metadata failure to cause report failure
m_crawl_metadata.side_effect = Exception
@@ -1582,29 +1820,31 @@ scbus-1 on xpt0 bus 0
m_dsrc_distro_networking_is_up.return_value = False
# ephemeral dhcp discovery failure,
# so cannot use a new ephemeral dhcp
- self.m_ephemeral_dhcpv4_with_reporting.return_value \
- .__enter__.side_effect = Exception
+ self.m_ephemeral_dhcpv4_with_reporting.return_value.__enter__.side_effect = ( # noqa: E501
+ Exception
+ )
self.assertTrue(dsrc._report_failure())
# ensure called with fallback lease
self.m_report_failure_to_fabric.assert_called_once_with(
description=mock.ANY,
- fallback_lease_file=dsrc.dhclient_lease_file)
+ fallback_lease_file=dsrc.dhclient_lease_file,
+ )
def test_exception_fetching_fabric_data_doesnt_propagate(self):
"""Errors communicating with fabric should warn, but return True."""
- dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()})
self.m_get_metadata_from_fabric.side_effect = Exception
ret = self._get_and_setup(dsrc)
self.assertTrue(ret)
def test_fabric_data_included_in_metadata(self):
- dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
- self.m_get_metadata_from_fabric.return_value = {'test': 'value'}
+ dsrc = self._get_ds({"ovfcontent": construct_valid_ovf_env()})
+ self.m_get_metadata_from_fabric.return_value = {"test": "value"}
ret = self._get_and_setup(dsrc)
self.assertTrue(ret)
- self.assertEqual('value', dsrc.metadata['test'])
+ self.assertEqual("value", dsrc.metadata["test"])
def test_instance_id_case_insensitive(self):
"""Return the previous iid when current is a case-insensitive match."""
@@ -1612,152 +1852,180 @@ scbus-1 on xpt0 bus 0
upper_iid = EXAMPLE_UUID.upper()
# lowercase current UUID
ds = self._get_ds(
- {'ovfcontent': construct_valid_ovf_env()}, instance_id=lower_iid
+ {"ovfcontent": construct_valid_ovf_env()}, instance_id=lower_iid
)
# UPPERCASE previous
write_file(
- os.path.join(self.paths.cloud_dir, 'data', 'instance-id'),
- upper_iid)
+ os.path.join(self.paths.cloud_dir, "data", "instance-id"),
+ upper_iid,
+ )
ds.get_data()
- self.assertEqual(upper_iid, ds.metadata['instance-id'])
+ self.assertEqual(upper_iid, ds.metadata["instance-id"])
# UPPERCASE current UUID
ds = self._get_ds(
- {'ovfcontent': construct_valid_ovf_env()}, instance_id=upper_iid
+ {"ovfcontent": construct_valid_ovf_env()}, instance_id=upper_iid
)
# lowercase previous
write_file(
- os.path.join(self.paths.cloud_dir, 'data', 'instance-id'),
- lower_iid)
+ os.path.join(self.paths.cloud_dir, "data", "instance-id"),
+ lower_iid,
+ )
ds.get_data()
- self.assertEqual(lower_iid, ds.metadata['instance-id'])
+ self.assertEqual(lower_iid, ds.metadata["instance-id"])
def test_instance_id_endianness(self):
"""Return the previous iid when dmi uuid is the byteswapped iid."""
- ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ ds = self._get_ds({"ovfcontent": construct_valid_ovf_env()})
# byte-swapped previous
write_file(
- os.path.join(self.paths.cloud_dir, 'data', 'instance-id'),
- '544CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8')
+ os.path.join(self.paths.cloud_dir, "data", "instance-id"),
+ "544CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8",
+ )
ds.get_data()
self.assertEqual(
- '544CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8', ds.metadata['instance-id'])
+ "544CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8", ds.metadata["instance-id"]
+ )
# not byte-swapped previous
write_file(
- os.path.join(self.paths.cloud_dir, 'data', 'instance-id'),
- '644CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8')
+ os.path.join(self.paths.cloud_dir, "data", "instance-id"),
+ "644CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8",
+ )
ds.get_data()
- self.assertEqual(self.instance_id, ds.metadata['instance-id'])
+ self.assertEqual(self.instance_id, ds.metadata["instance-id"])
def test_instance_id_from_dmidecode_used(self):
- ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ ds = self._get_ds({"ovfcontent": construct_valid_ovf_env()})
ds.get_data()
- self.assertEqual(self.instance_id, ds.metadata['instance-id'])
+ self.assertEqual(self.instance_id, ds.metadata["instance-id"])
def test_instance_id_from_dmidecode_used_for_builtin(self):
- ds = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
+ ds = self._get_ds({"ovfcontent": construct_valid_ovf_env()})
ds.get_data()
- self.assertEqual(self.instance_id, ds.metadata['instance-id'])
+ self.assertEqual(self.instance_id, ds.metadata["instance-id"])
- @mock.patch(MOCKPATH + 'util.is_FreeBSD')
- @mock.patch(MOCKPATH + '_check_freebsd_cdrom')
- def test_list_possible_azure_ds(self, m_check_fbsd_cdrom,
- m_is_FreeBSD):
+ @mock.patch(MOCKPATH + "util.is_FreeBSD")
+ @mock.patch(MOCKPATH + "_check_freebsd_cdrom")
+ def test_list_possible_azure_ds(self, m_check_fbsd_cdrom, m_is_FreeBSD):
"""On FreeBSD, possible devs should show /dev/cd0."""
m_is_FreeBSD.return_value = True
m_check_fbsd_cdrom.return_value = True
possible_ds = []
- for src in dsaz.list_possible_azure_ds(
- "seed_dir", "cache_dir"):
+ for src in dsaz.list_possible_azure_ds("seed_dir", "cache_dir"):
possible_ds.append(src)
- self.assertEqual(possible_ds, ["seed_dir",
- dsaz.DEFAULT_PROVISIONING_ISO_DEV,
- "/dev/cd0",
- "cache_dir"])
self.assertEqual(
- [mock.call("/dev/cd0")], m_check_fbsd_cdrom.call_args_list)
+ possible_ds,
+ [
+ "seed_dir",
+ dsaz.DEFAULT_PROVISIONING_ISO_DEV,
+ "/dev/cd0",
+ "cache_dir",
+ ],
+ )
+ self.assertEqual(
+ [mock.call("/dev/cd0")], m_check_fbsd_cdrom.call_args_list
+ )
- @mock.patch('cloudinit.sources.DataSourceAzure.device_driver',
- return_value=None)
- @mock.patch('cloudinit.net.generate_fallback_config')
+ @mock.patch(
+ "cloudinit.sources.DataSourceAzure.device_driver", return_value=None
+ )
+ @mock.patch("cloudinit.net.generate_fallback_config")
def test_imds_network_config(self, mock_fallback, m_driver):
"""Network config is generated from IMDS network data when present."""
- sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
- odata = {'HostName': "myhost", 'UserName': "myuser"}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': sys_cfg}
+ sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
+ odata = {"HostName": "myhost", "UserName": "myuser"}
+ data = {
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": sys_cfg,
+ }
dsrc = self._get_ds(data)
ret = dsrc.get_data()
self.assertTrue(ret)
expected_cfg = {
- 'ethernets': {
- 'eth0': {'dhcp4': True,
- 'dhcp4-overrides': {'route-metric': 100},
- 'dhcp6': False,
- 'match': {'macaddress': '00:0d:3a:04:75:98'},
- 'set-name': 'eth0'}},
- 'version': 2}
+ "ethernets": {
+ "eth0": {
+ "dhcp4": True,
+ "dhcp4-overrides": {"route-metric": 100},
+ "dhcp6": False,
+ "match": {"macaddress": "00:0d:3a:04:75:98"},
+ "set-name": "eth0",
+ }
+ },
+ "version": 2,
+ }
self.assertEqual(expected_cfg, dsrc.network_config)
mock_fallback.assert_not_called()
- @mock.patch('cloudinit.net.get_interface_mac')
- @mock.patch('cloudinit.net.get_devicelist')
- @mock.patch('cloudinit.net.device_driver')
- @mock.patch('cloudinit.net.generate_fallback_config')
+ @mock.patch("cloudinit.net.get_interface_mac")
+ @mock.patch("cloudinit.net.get_devicelist")
+ @mock.patch("cloudinit.net.device_driver")
+ @mock.patch("cloudinit.net.generate_fallback_config")
def test_imds_network_ignored_when_apply_network_config_false(
- self, mock_fallback, mock_dd, mock_devlist, mock_get_mac):
+ self, mock_fallback, mock_dd, mock_devlist, mock_get_mac
+ ):
"""When apply_network_config is False, use fallback instead of IMDS."""
- sys_cfg = {'datasource': {'Azure': {'apply_network_config': False}}}
- odata = {'HostName': "myhost", 'UserName': "myuser"}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': sys_cfg}
+ sys_cfg = {"datasource": {"Azure": {"apply_network_config": False}}}
+ odata = {"HostName": "myhost", "UserName": "myuser"}
+ data = {
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": sys_cfg,
+ }
fallback_config = {
- 'version': 1,
- 'config': [{
- 'type': 'physical', 'name': 'eth0',
- 'mac_address': '00:11:22:33:44:55',
- 'params': {'driver': 'hv_netsvc'},
- 'subnets': [{'type': 'dhcp'}],
- }]
+ "version": 1,
+ "config": [
+ {
+ "type": "physical",
+ "name": "eth0",
+ "mac_address": "00:11:22:33:44:55",
+ "params": {"driver": "hv_netsvc"},
+ "subnets": [{"type": "dhcp"}],
+ }
+ ],
}
mock_fallback.return_value = fallback_config
- mock_devlist.return_value = ['eth0']
- mock_dd.return_value = ['hv_netsvc']
- mock_get_mac.return_value = '00:11:22:33:44:55'
+ mock_devlist.return_value = ["eth0"]
+ mock_dd.return_value = ["hv_netsvc"]
+ mock_get_mac.return_value = "00:11:22:33:44:55"
dsrc = self._get_ds(data)
self.assertTrue(dsrc.get_data())
self.assertEqual(dsrc.network_config, fallback_config)
- @mock.patch('cloudinit.net.get_interface_mac')
- @mock.patch('cloudinit.net.get_devicelist')
- @mock.patch('cloudinit.net.device_driver')
- @mock.patch('cloudinit.net.generate_fallback_config', autospec=True)
- def test_fallback_network_config(self, mock_fallback, mock_dd,
- mock_devlist, mock_get_mac):
+ @mock.patch("cloudinit.net.get_interface_mac")
+ @mock.patch("cloudinit.net.get_devicelist")
+ @mock.patch("cloudinit.net.device_driver")
+ @mock.patch("cloudinit.net.generate_fallback_config", autospec=True)
+ def test_fallback_network_config(
+ self, mock_fallback, mock_dd, mock_devlist, mock_get_mac
+ ):
"""On absent IMDS network data, generate network fallback config."""
- odata = {'HostName': "myhost", 'UserName': "myuser"}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': {}}
+ odata = {"HostName": "myhost", "UserName": "myuser"}
+ data = {
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": {},
+ }
fallback_config = {
- 'version': 1,
- 'config': [{
- 'type': 'physical', 'name': 'eth0',
- 'mac_address': '00:11:22:33:44:55',
- 'params': {'driver': 'hv_netsvc'},
- 'subnets': [{'type': 'dhcp'}],
- }]
+ "version": 1,
+ "config": [
+ {
+ "type": "physical",
+ "name": "eth0",
+ "mac_address": "00:11:22:33:44:55",
+ "params": {"driver": "hv_netsvc"},
+ "subnets": [{"type": "dhcp"}],
+ }
+ ],
}
mock_fallback.return_value = fallback_config
- mock_devlist.return_value = ['eth0']
- mock_dd.return_value = ['hv_netsvc']
- mock_get_mac.return_value = '00:11:22:33:44:55'
+ mock_devlist.return_value = ["eth0"]
+ mock_dd.return_value = ["hv_netsvc"]
+ mock_get_mac.return_value = "00:11:22:33:44:55"
dsrc = self._get_ds(data)
# Represent empty response from network imds
@@ -1768,37 +2036,41 @@ scbus-1 on xpt0 bus 0
netconfig = dsrc.network_config
self.assertEqual(netconfig, fallback_config)
mock_fallback.assert_called_with(
- blacklist_drivers=['mlx4_core', 'mlx5_core'],
- config_driver=True)
+ blacklist_drivers=["mlx4_core", "mlx5_core"], config_driver=True
+ )
- @mock.patch(MOCKPATH + 'net.get_interfaces', autospec=True)
- def test_blacklist_through_distro(
- self, m_net_get_interfaces):
+ @mock.patch(MOCKPATH + "net.get_interfaces", autospec=True)
+ def test_blacklist_through_distro(self, m_net_get_interfaces):
"""Verify Azure DS updates blacklist drivers in the distro's
- networking object."""
- odata = {'HostName': "myhost", 'UserName': "myuser"}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': {}}
+ networking object."""
+ odata = {"HostName": "myhost", "UserName": "myuser"}
+ data = {
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": {},
+ }
- distro_cls = distros.fetch('ubuntu')
- distro = distro_cls('ubuntu', {}, self.paths)
+ distro_cls = distros.fetch("ubuntu")
+ distro = distro_cls("ubuntu", {}, self.paths)
dsrc = self._get_ds(data, distro=distro)
dsrc.get_data()
- self.assertEqual(distro.networking.blacklist_drivers,
- dsaz.BLACKLIST_DRIVERS)
+ self.assertEqual(
+ distro.networking.blacklist_drivers, dsaz.BLACKLIST_DRIVERS
+ )
distro.networking.get_interfaces_by_mac()
m_net_get_interfaces.assert_called_with(
- blacklist_drivers=dsaz.BLACKLIST_DRIVERS)
+ blacklist_drivers=dsaz.BLACKLIST_DRIVERS
+ )
@mock.patch(
- 'cloudinit.sources.helpers.azure.OpenSSLManager.parse_certificates')
+ "cloudinit.sources.helpers.azure.OpenSSLManager.parse_certificates"
+ )
def test_get_public_ssh_keys_with_imds(self, m_parse_certificates):
- sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
- odata = {'HostName': "myhost", 'UserName': "myuser"}
+ sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
+ odata = {"HostName": "myhost", "UserName": "myuser"}
data = {
- 'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': sys_cfg
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": sys_cfg,
}
dsrc = self._get_ds(data)
dsrc.get_data()
@@ -1808,32 +2080,32 @@ scbus-1 on xpt0 bus 0
self.assertEqual(m_parse_certificates.call_count, 0)
def test_key_without_crlf_valid(self):
- test_key = 'ssh-rsa somerandomkeystuff some comment'
+ test_key = "ssh-rsa somerandomkeystuff some comment"
assert True is dsaz._key_is_openssh_formatted(test_key)
def test_key_with_crlf_invalid(self):
- test_key = 'ssh-rsa someran\r\ndomkeystuff some comment'
+ test_key = "ssh-rsa someran\r\ndomkeystuff some comment"
assert False is dsaz._key_is_openssh_formatted(test_key)
def test_key_endswith_crlf_valid(self):
- test_key = 'ssh-rsa somerandomkeystuff some comment\r\n'
+ test_key = "ssh-rsa somerandomkeystuff some comment\r\n"
assert True is dsaz._key_is_openssh_formatted(test_key)
@mock.patch(
- 'cloudinit.sources.helpers.azure.OpenSSLManager.parse_certificates')
- @mock.patch(MOCKPATH + 'get_metadata_from_imds')
+ "cloudinit.sources.helpers.azure.OpenSSLManager.parse_certificates"
+ )
+ @mock.patch(MOCKPATH + "get_metadata_from_imds")
def test_get_public_ssh_keys_with_no_openssh_format(
- self,
- m_get_metadata_from_imds,
- m_parse_certificates):
+ self, m_get_metadata_from_imds, m_parse_certificates
+ ):
imds_data = copy.deepcopy(NETWORK_METADATA)
- imds_data['compute']['publicKeys'][0]['keyData'] = 'no-openssh-format'
+ imds_data["compute"]["publicKeys"][0]["keyData"] = "no-openssh-format"
m_get_metadata_from_imds.return_value = imds_data
- sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
- odata = {'HostName': "myhost", 'UserName': "myuser"}
+ sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
+ odata = {"HostName": "myhost", "UserName": "myuser"}
data = {
- 'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': sys_cfg
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": sys_cfg,
}
dsrc = self._get_ds(data)
dsrc.get_data()
@@ -1842,38 +2114,37 @@ scbus-1 on xpt0 bus 0
self.assertEqual(ssh_keys, [])
self.assertEqual(m_parse_certificates.call_count, 0)
- @mock.patch(MOCKPATH + 'get_metadata_from_imds')
- def test_get_public_ssh_keys_without_imds(
- self,
- m_get_metadata_from_imds):
+ @mock.patch(MOCKPATH + "get_metadata_from_imds")
+ def test_get_public_ssh_keys_without_imds(self, m_get_metadata_from_imds):
m_get_metadata_from_imds.return_value = dict()
- sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
- odata = {'HostName': "myhost", 'UserName': "myuser"}
+ sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
+ odata = {"HostName": "myhost", "UserName": "myuser"}
data = {
- 'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': sys_cfg
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": sys_cfg,
}
dsrc = self._get_ds(data)
- dsaz.get_metadata_from_fabric.return_value = {'public-keys': ['key2']}
+ dsaz.get_metadata_from_fabric.return_value = {"public-keys": ["key2"]}
dsrc.get_data()
dsrc.setup(True)
ssh_keys = dsrc.get_public_ssh_keys()
- self.assertEqual(ssh_keys, ['key2'])
+ self.assertEqual(ssh_keys, ["key2"])
- @mock.patch(MOCKPATH + 'get_metadata_from_imds')
+ @mock.patch(MOCKPATH + "get_metadata_from_imds")
def test_imds_api_version_wanted_nonexistent(
- self,
- m_get_metadata_from_imds):
+ self, m_get_metadata_from_imds
+ ):
def get_metadata_from_imds_side_eff(*args, **kwargs):
- if kwargs['api_version'] == dsaz.IMDS_VER_WANT:
+ if kwargs["api_version"] == dsaz.IMDS_VER_WANT:
raise url_helper.UrlError("No IMDS version", code=400)
return NETWORK_METADATA
+
m_get_metadata_from_imds.side_effect = get_metadata_from_imds_side_eff
- sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
- odata = {'HostName': "myhost", 'UserName': "myuser"}
+ sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
+ odata = {"HostName": "myhost", "UserName": "myuser"}
data = {
- 'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': sys_cfg
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": sys_cfg,
}
dsrc = self._get_ds(data)
dsrc.get_data()
@@ -1881,86 +2152,86 @@ scbus-1 on xpt0 bus 0
self.assertTrue(dsrc.failed_desired_api_version)
@mock.patch(
- MOCKPATH + 'get_metadata_from_imds', return_value=NETWORK_METADATA)
+ MOCKPATH + "get_metadata_from_imds", return_value=NETWORK_METADATA
+ )
def test_imds_api_version_wanted_exists(self, m_get_metadata_from_imds):
- sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
- odata = {'HostName': "myhost", 'UserName': "myuser"}
+ sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
+ odata = {"HostName": "myhost", "UserName": "myuser"}
data = {
- 'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': sys_cfg
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": sys_cfg,
}
dsrc = self._get_ds(data)
dsrc.get_data()
self.assertIsNotNone(dsrc.metadata)
self.assertFalse(dsrc.failed_desired_api_version)
- @mock.patch(MOCKPATH + 'get_metadata_from_imds')
+ @mock.patch(MOCKPATH + "get_metadata_from_imds")
def test_hostname_from_imds(self, m_get_metadata_from_imds):
- sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
- odata = {'HostName': "myhost", 'UserName': "myuser"}
+ sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
+ odata = {"HostName": "myhost", "UserName": "myuser"}
data = {
- 'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': sys_cfg
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": sys_cfg,
}
imds_data_with_os_profile = copy.deepcopy(NETWORK_METADATA)
imds_data_with_os_profile["compute"]["osProfile"] = dict(
adminUsername="username1",
computerName="hostname1",
- disablePasswordAuthentication="true"
+ disablePasswordAuthentication="true",
)
m_get_metadata_from_imds.return_value = imds_data_with_os_profile
dsrc = self._get_ds(data)
dsrc.get_data()
self.assertEqual(dsrc.metadata["local-hostname"], "hostname1")
- @mock.patch(MOCKPATH + 'get_metadata_from_imds')
+ @mock.patch(MOCKPATH + "get_metadata_from_imds")
def test_username_from_imds(self, m_get_metadata_from_imds):
- sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
- odata = {'HostName': "myhost", 'UserName': "myuser"}
+ sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
+ odata = {"HostName": "myhost", "UserName": "myuser"}
data = {
- 'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': sys_cfg
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": sys_cfg,
}
imds_data_with_os_profile = copy.deepcopy(NETWORK_METADATA)
imds_data_with_os_profile["compute"]["osProfile"] = dict(
adminUsername="username1",
computerName="hostname1",
- disablePasswordAuthentication="true"
+ disablePasswordAuthentication="true",
)
m_get_metadata_from_imds.return_value = imds_data_with_os_profile
dsrc = self._get_ds(data)
dsrc.get_data()
self.assertEqual(
- dsrc.cfg["system_info"]["default_user"]["name"],
- "username1"
+ dsrc.cfg["system_info"]["default_user"]["name"], "username1"
)
- @mock.patch(MOCKPATH + 'get_metadata_from_imds')
+ @mock.patch(MOCKPATH + "get_metadata_from_imds")
def test_disable_password_from_imds(self, m_get_metadata_from_imds):
- sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
- odata = {'HostName': "myhost", 'UserName': "myuser"}
+ sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
+ odata = {"HostName": "myhost", "UserName": "myuser"}
data = {
- 'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': sys_cfg
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": sys_cfg,
}
imds_data_with_os_profile = copy.deepcopy(NETWORK_METADATA)
imds_data_with_os_profile["compute"]["osProfile"] = dict(
adminUsername="username1",
computerName="hostname1",
- disablePasswordAuthentication="true"
+ disablePasswordAuthentication="true",
)
m_get_metadata_from_imds.return_value = imds_data_with_os_profile
dsrc = self._get_ds(data)
dsrc.get_data()
self.assertTrue(dsrc.metadata["disable_password"])
- @mock.patch(MOCKPATH + 'get_metadata_from_imds')
+ @mock.patch(MOCKPATH + "get_metadata_from_imds")
def test_userdata_from_imds(self, m_get_metadata_from_imds):
- sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
- odata = {'HostName': "myhost", 'UserName': "myuser"}
+ sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
+ odata = {"HostName": "myhost", "UserName": "myuser"}
data = {
- 'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': sys_cfg
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": sys_cfg,
}
userdata = "userdataImds"
imds_data = copy.deepcopy(NETWORK_METADATA)
@@ -1974,20 +2245,22 @@ scbus-1 on xpt0 bus 0
dsrc = self._get_ds(data)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(dsrc.userdata_raw, userdata.encode('utf-8'))
+ self.assertEqual(dsrc.userdata_raw, userdata.encode("utf-8"))
- @mock.patch(MOCKPATH + 'get_metadata_from_imds')
+ @mock.patch(MOCKPATH + "get_metadata_from_imds")
def test_userdata_from_imds_with_customdata_from_OVF(
- self, m_get_metadata_from_imds):
+ self, m_get_metadata_from_imds
+ ):
userdataOVF = "userdataOVF"
odata = {
- 'HostName': "myhost", 'UserName': "myuser",
- 'UserData': {'text': b64e(userdataOVF), 'encoding': 'base64'}
+ "HostName": "myhost",
+ "UserName": "myuser",
+ "UserData": {"text": b64e(userdataOVF), "encoding": "base64"},
}
- sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
+ sys_cfg = {"datasource": {"Azure": {"apply_network_config": True}}}
data = {
- 'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': sys_cfg
+ "ovfcontent": construct_valid_ovf_env(data=odata),
+ "sys_cfg": sys_cfg,
}
userdataImds = "userdataImds"
@@ -2002,7 +2275,7 @@ scbus-1 on xpt0 bus 0
dsrc = self._get_ds(data)
ret = dsrc.get_data()
self.assertTrue(ret)
- self.assertEqual(dsrc.userdata_raw, userdataOVF.encode('utf-8'))
+ self.assertEqual(dsrc.userdata_raw, userdataOVF.encode("utf-8"))
class TestLoadAzureDsDir(CiTestCase):
@@ -2017,39 +2290,40 @@ class TestLoadAzureDsDir(CiTestCase):
with self.assertRaises(dsaz.NonAzureDataSource) as context_manager:
dsaz.load_azure_ds_dir(self.source_dir)
self.assertEqual(
- 'No ovf-env file found',
- str(context_manager.exception))
+ "No ovf-env file found", str(context_manager.exception)
+ )
def test_wb_invalid_ovf_env_xml_calls_read_azure_ovf(self):
"""load_azure_ds_dir calls read_azure_ovf to parse the xml."""
- ovf_path = os.path.join(self.source_dir, 'ovf-env.xml')
- with open(ovf_path, 'wb') as stream:
- stream.write(b'invalid xml')
+ ovf_path = os.path.join(self.source_dir, "ovf-env.xml")
+ with open(ovf_path, "wb") as stream:
+ stream.write(b"invalid xml")
with self.assertRaises(dsaz.BrokenAzureDataSource) as context_manager:
dsaz.load_azure_ds_dir(self.source_dir)
self.assertEqual(
- 'Invalid ovf-env.xml: syntax error: line 1, column 0',
- str(context_manager.exception))
+ "Invalid ovf-env.xml: syntax error: line 1, column 0",
+ str(context_manager.exception),
+ )
class TestReadAzureOvf(CiTestCase):
-
def test_invalid_xml_raises_non_azure_ds(self):
invalid_xml = "<foo>" + construct_valid_ovf_env(data={})
- self.assertRaises(dsaz.BrokenAzureDataSource,
- dsaz.read_azure_ovf, invalid_xml)
+ self.assertRaises(
+ dsaz.BrokenAzureDataSource, dsaz.read_azure_ovf, invalid_xml
+ )
def test_load_with_pubkeys(self):
- mypklist = [{'fingerprint': 'fp1', 'path': 'path1', 'value': ''}]
- pubkeys = [(x['fingerprint'], x['path'], x['value']) for x in mypklist]
+ mypklist = [{"fingerprint": "fp1", "path": "path1", "value": ""}]
+ pubkeys = [(x["fingerprint"], x["path"], x["value"]) for x in mypklist]
content = construct_valid_ovf_env(pubkeys=pubkeys)
(_md, _ud, cfg) = dsaz.read_azure_ovf(content)
for mypk in mypklist:
- self.assertIn(mypk, cfg['_pubkeys'])
+ self.assertIn(mypk, cfg["_pubkeys"])
class TestCanDevBeReformatted(CiTestCase):
- warning_file = 'dataloss_warning_readme.txt'
+ warning_file = "dataloss_warning_readme.txt"
def _domock(self, mockpath, sattr=None):
patcher = mock.patch(mockpath)
@@ -2060,42 +2334,42 @@ class TestCanDevBeReformatted(CiTestCase):
bypath = {}
for path, data in devs.items():
bypath[path] = data
- if 'realpath' in data:
- bypath[data['realpath']] = data
- for ppath, pdata in data.get('partitions', {}).items():
+ if "realpath" in data:
+ bypath[data["realpath"]] = data
+ for ppath, pdata in data.get("partitions", {}).items():
bypath[ppath] = pdata
- if 'realpath' in data:
- bypath[pdata['realpath']] = pdata
+ if "realpath" in data:
+ bypath[pdata["realpath"]] = pdata
def realpath(d):
- return bypath[d].get('realpath', d)
+ return bypath[d].get("realpath", d)
def partitions_on_device(devpath):
- parts = bypath.get(devpath, {}).get('partitions', {})
+ parts = bypath.get(devpath, {}).get("partitions", {})
ret = []
for path, data in parts.items():
- ret.append((data.get('num'), realpath(path)))
+ ret.append((data.get("num"), realpath(path)))
# return sorted by partition number
return sorted(ret, key=lambda d: d[0])
def mount_cb(device, callback, mtype, update_env_for_mount):
- self.assertEqual('ntfs', mtype)
- self.assertEqual('C', update_env_for_mount.get('LANG'))
+ self.assertEqual("ntfs", mtype)
+ self.assertEqual("C", update_env_for_mount.get("LANG"))
p = self.tmp_dir()
- for f in bypath.get(device).get('files', []):
+ for f in bypath.get(device).get("files", []):
write_file(os.path.join(p, f), content=f)
return callback(p)
def has_ntfs_fs(device):
- return bypath.get(device, {}).get('fs') == 'ntfs'
+ return bypath.get(device, {}).get("fs") == "ntfs"
p = MOCKPATH
- self._domock(p + "_partitions_on_device", 'm_partitions_on_device')
- self._domock(p + "_has_ntfs_filesystem", 'm_has_ntfs_filesystem')
- self._domock(p + "util.mount_cb", 'm_mount_cb')
- self._domock(p + "os.path.realpath", 'm_realpath')
- self._domock(p + "os.path.exists", 'm_exists')
- self._domock(p + "util.SeLinuxGuard", 'm_selguard')
+ self._domock(p + "_partitions_on_device", "m_partitions_on_device")
+ self._domock(p + "_has_ntfs_filesystem", "m_has_ntfs_filesystem")
+ self._domock(p + "util.mount_cb", "m_mount_cb")
+ self._domock(p + "os.path.realpath", "m_realpath")
+ self._domock(p + "os.path.exists", "m_exists")
+ self._domock(p + "util.SeLinuxGuard", "m_selguard")
self.m_exists.side_effect = lambda p: p in bypath
self.m_realpath.side_effect = realpath
@@ -2107,330 +2381,433 @@ class TestCanDevBeReformatted(CiTestCase):
def test_three_partitions_is_false(self):
"""A disk with 3 partitions can not be formatted."""
- self.patchup({
- '/dev/sda': {
- 'partitions': {
- '/dev/sda1': {'num': 1},
- '/dev/sda2': {'num': 2},
- '/dev/sda3': {'num': 3},
- }}})
- value, msg = dsaz.can_dev_be_reformatted("/dev/sda",
- preserve_ntfs=False)
+ self.patchup(
+ {
+ "/dev/sda": {
+ "partitions": {
+ "/dev/sda1": {"num": 1},
+ "/dev/sda2": {"num": 2},
+ "/dev/sda3": {"num": 3},
+ }
+ }
+ }
+ )
+ value, msg = dsaz.can_dev_be_reformatted(
+ "/dev/sda", preserve_ntfs=False
+ )
self.assertFalse(value)
self.assertIn("3 or more", msg.lower())
def test_no_partitions_is_false(self):
"""A disk with no partitions can not be formatted."""
- self.patchup({'/dev/sda': {}})
- value, msg = dsaz.can_dev_be_reformatted("/dev/sda",
- preserve_ntfs=False)
+ self.patchup({"/dev/sda": {}})
+ value, msg = dsaz.can_dev_be_reformatted(
+ "/dev/sda", preserve_ntfs=False
+ )
self.assertFalse(value)
self.assertIn("not partitioned", msg.lower())
def test_two_partitions_not_ntfs_false(self):
"""2 partitions and 2nd not ntfs can not be formatted."""
- self.patchup({
- '/dev/sda': {
- 'partitions': {
- '/dev/sda1': {'num': 1},
- '/dev/sda2': {'num': 2, 'fs': 'ext4', 'files': []},
- }}})
- value, msg = dsaz.can_dev_be_reformatted("/dev/sda",
- preserve_ntfs=False)
+ self.patchup(
+ {
+ "/dev/sda": {
+ "partitions": {
+ "/dev/sda1": {"num": 1},
+ "/dev/sda2": {"num": 2, "fs": "ext4", "files": []},
+ }
+ }
+ }
+ )
+ value, msg = dsaz.can_dev_be_reformatted(
+ "/dev/sda", preserve_ntfs=False
+ )
self.assertFalse(value)
self.assertIn("not ntfs", msg.lower())
def test_two_partitions_ntfs_populated_false(self):
"""2 partitions and populated ntfs fs on 2nd can not be formatted."""
- self.patchup({
- '/dev/sda': {
- 'partitions': {
- '/dev/sda1': {'num': 1},
- '/dev/sda2': {'num': 2, 'fs': 'ntfs',
- 'files': ['secret.txt']},
- }}})
- value, msg = dsaz.can_dev_be_reformatted("/dev/sda",
- preserve_ntfs=False)
+ self.patchup(
+ {
+ "/dev/sda": {
+ "partitions": {
+ "/dev/sda1": {"num": 1},
+ "/dev/sda2": {
+ "num": 2,
+ "fs": "ntfs",
+ "files": ["secret.txt"],
+ },
+ }
+ }
+ }
+ )
+ value, msg = dsaz.can_dev_be_reformatted(
+ "/dev/sda", preserve_ntfs=False
+ )
self.assertFalse(value)
self.assertIn("files on it", msg.lower())
def test_two_partitions_ntfs_empty_is_true(self):
"""2 partitions and empty ntfs fs on 2nd can be formatted."""
- self.patchup({
- '/dev/sda': {
- 'partitions': {
- '/dev/sda1': {'num': 1},
- '/dev/sda2': {'num': 2, 'fs': 'ntfs', 'files': []},
- }}})
- value, msg = dsaz.can_dev_be_reformatted("/dev/sda",
- preserve_ntfs=False)
+ self.patchup(
+ {
+ "/dev/sda": {
+ "partitions": {
+ "/dev/sda1": {"num": 1},
+ "/dev/sda2": {"num": 2, "fs": "ntfs", "files": []},
+ }
+ }
+ }
+ )
+ value, msg = dsaz.can_dev_be_reformatted(
+ "/dev/sda", preserve_ntfs=False
+ )
self.assertTrue(value)
self.assertIn("safe for", msg.lower())
def test_one_partition_not_ntfs_false(self):
"""1 partition witih fs other than ntfs can not be formatted."""
- self.patchup({
- '/dev/sda': {
- 'partitions': {
- '/dev/sda1': {'num': 1, 'fs': 'zfs'},
- }}})
- value, msg = dsaz.can_dev_be_reformatted("/dev/sda",
- preserve_ntfs=False)
+ self.patchup(
+ {
+ "/dev/sda": {
+ "partitions": {
+ "/dev/sda1": {"num": 1, "fs": "zfs"},
+ }
+ }
+ }
+ )
+ value, msg = dsaz.can_dev_be_reformatted(
+ "/dev/sda", preserve_ntfs=False
+ )
self.assertFalse(value)
self.assertIn("not ntfs", msg.lower())
def test_one_partition_ntfs_populated_false(self):
"""1 mountable ntfs partition with many files can not be formatted."""
- self.patchup({
- '/dev/sda': {
- 'partitions': {
- '/dev/sda1': {'num': 1, 'fs': 'ntfs',
- 'files': ['file1.txt', 'file2.exe']},
- }}})
- with mock.patch.object(dsaz.LOG, 'warning') as warning:
- value, msg = dsaz.can_dev_be_reformatted("/dev/sda",
- preserve_ntfs=False)
+ self.patchup(
+ {
+ "/dev/sda": {
+ "partitions": {
+ "/dev/sda1": {
+ "num": 1,
+ "fs": "ntfs",
+ "files": ["file1.txt", "file2.exe"],
+ },
+ }
+ }
+ }
+ )
+ with mock.patch.object(dsaz.LOG, "warning") as warning:
+ value, msg = dsaz.can_dev_be_reformatted(
+ "/dev/sda", preserve_ntfs=False
+ )
wmsg = warning.call_args[0][0]
- self.assertIn("looks like you're using NTFS on the ephemeral disk",
- wmsg)
+ self.assertIn(
+ "looks like you're using NTFS on the ephemeral disk", wmsg
+ )
self.assertFalse(value)
self.assertIn("files on it", msg.lower())
def test_one_partition_ntfs_empty_is_true(self):
"""1 mountable ntfs partition and no files can be formatted."""
- self.patchup({
- '/dev/sda': {
- 'partitions': {
- '/dev/sda1': {'num': 1, 'fs': 'ntfs', 'files': []}
- }}})
- value, msg = dsaz.can_dev_be_reformatted("/dev/sda",
- preserve_ntfs=False)
+ self.patchup(
+ {
+ "/dev/sda": {
+ "partitions": {
+ "/dev/sda1": {"num": 1, "fs": "ntfs", "files": []}
+ }
+ }
+ }
+ )
+ value, msg = dsaz.can_dev_be_reformatted(
+ "/dev/sda", preserve_ntfs=False
+ )
self.assertTrue(value)
self.assertIn("safe for", msg.lower())
def test_one_partition_ntfs_empty_with_dataloss_file_is_true(self):
"""1 mountable ntfs partition and only warn file can be formatted."""
- self.patchup({
- '/dev/sda': {
- 'partitions': {
- '/dev/sda1': {'num': 1, 'fs': 'ntfs',
- 'files': ['dataloss_warning_readme.txt']}
- }}})
- value, msg = dsaz.can_dev_be_reformatted("/dev/sda",
- preserve_ntfs=False)
+ self.patchup(
+ {
+ "/dev/sda": {
+ "partitions": {
+ "/dev/sda1": {
+ "num": 1,
+ "fs": "ntfs",
+ "files": ["dataloss_warning_readme.txt"],
+ }
+ }
+ }
+ }
+ )
+ value, msg = dsaz.can_dev_be_reformatted(
+ "/dev/sda", preserve_ntfs=False
+ )
self.assertTrue(value)
self.assertIn("safe for", msg.lower())
def test_one_partition_through_realpath_is_true(self):
"""A symlink to a device with 1 ntfs partition can be formatted."""
- epath = '/dev/disk/cloud/azure_resource'
- self.patchup({
- epath: {
- 'realpath': '/dev/sdb',
- 'partitions': {
- epath + '-part1': {
- 'num': 1, 'fs': 'ntfs', 'files': [self.warning_file],
- 'realpath': '/dev/sdb1'}
- }}})
- value, msg = dsaz.can_dev_be_reformatted(epath,
- preserve_ntfs=False)
+ epath = "/dev/disk/cloud/azure_resource"
+ self.patchup(
+ {
+ epath: {
+ "realpath": "/dev/sdb",
+ "partitions": {
+ epath
+ + "-part1": {
+ "num": 1,
+ "fs": "ntfs",
+ "files": [self.warning_file],
+ "realpath": "/dev/sdb1",
+ }
+ },
+ }
+ }
+ )
+ value, msg = dsaz.can_dev_be_reformatted(epath, preserve_ntfs=False)
self.assertTrue(value)
self.assertIn("safe for", msg.lower())
def test_three_partition_through_realpath_is_false(self):
"""A symlink to a device with 3 partitions can not be formatted."""
- epath = '/dev/disk/cloud/azure_resource'
- self.patchup({
- epath: {
- 'realpath': '/dev/sdb',
- 'partitions': {
- epath + '-part1': {
- 'num': 1, 'fs': 'ntfs', 'files': [self.warning_file],
- 'realpath': '/dev/sdb1'},
- epath + '-part2': {'num': 2, 'fs': 'ext3',
- 'realpath': '/dev/sdb2'},
- epath + '-part3': {'num': 3, 'fs': 'ext',
- 'realpath': '/dev/sdb3'}
- }}})
- value, msg = dsaz.can_dev_be_reformatted(epath,
- preserve_ntfs=False)
+ epath = "/dev/disk/cloud/azure_resource"
+ self.patchup(
+ {
+ epath: {
+ "realpath": "/dev/sdb",
+ "partitions": {
+ epath
+ + "-part1": {
+ "num": 1,
+ "fs": "ntfs",
+ "files": [self.warning_file],
+ "realpath": "/dev/sdb1",
+ },
+ epath
+ + "-part2": {
+ "num": 2,
+ "fs": "ext3",
+ "realpath": "/dev/sdb2",
+ },
+ epath
+ + "-part3": {
+ "num": 3,
+ "fs": "ext",
+ "realpath": "/dev/sdb3",
+ },
+ },
+ }
+ }
+ )
+ value, msg = dsaz.can_dev_be_reformatted(epath, preserve_ntfs=False)
self.assertFalse(value)
self.assertIn("3 or more", msg.lower())
def test_ntfs_mount_errors_true(self):
"""can_dev_be_reformatted does not fail if NTFS is unknown fstype."""
- self.patchup({
- '/dev/sda': {
- 'partitions': {
- '/dev/sda1': {'num': 1, 'fs': 'ntfs', 'files': []}
- }}})
+ self.patchup(
+ {
+ "/dev/sda": {
+ "partitions": {
+ "/dev/sda1": {"num": 1, "fs": "ntfs", "files": []}
+ }
+ }
+ }
+ )
error_msgs = [
"Stderr: mount: unknown filesystem type 'ntfs'", # RHEL
- "Stderr: mount: /dev/sdb1: unknown filesystem type 'ntfs'" # SLES
+ "Stderr: mount: /dev/sdb1: unknown filesystem type 'ntfs'", # SLES
]
for err_msg in error_msgs:
self.m_mount_cb.side_effect = MountFailedError(
- "Failed mounting %s to %s due to: \nUnexpected.\n%s" %
- ('/dev/sda', '/fake-tmp/dir', err_msg))
+ "Failed mounting %s to %s due to: \nUnexpected.\n%s"
+ % ("/dev/sda", "/fake-tmp/dir", err_msg)
+ )
- value, msg = dsaz.can_dev_be_reformatted('/dev/sda',
- preserve_ntfs=False)
+ value, msg = dsaz.can_dev_be_reformatted(
+ "/dev/sda", preserve_ntfs=False
+ )
self.assertTrue(value)
- self.assertIn('cannot mount NTFS, assuming', msg)
+ self.assertIn("cannot mount NTFS, assuming", msg)
def test_never_destroy_ntfs_config_false(self):
"""Normally formattable situation with never_destroy_ntfs set."""
- self.patchup({
- '/dev/sda': {
- 'partitions': {
- '/dev/sda1': {'num': 1, 'fs': 'ntfs',
- 'files': ['dataloss_warning_readme.txt']}
- }}})
- value, msg = dsaz.can_dev_be_reformatted("/dev/sda",
- preserve_ntfs=True)
+ self.patchup(
+ {
+ "/dev/sda": {
+ "partitions": {
+ "/dev/sda1": {
+ "num": 1,
+ "fs": "ntfs",
+ "files": ["dataloss_warning_readme.txt"],
+ }
+ }
+ }
+ }
+ )
+ value, msg = dsaz.can_dev_be_reformatted(
+ "/dev/sda", preserve_ntfs=True
+ )
self.assertFalse(value)
- self.assertIn("config says to never destroy NTFS "
- "(datasource.Azure.never_destroy_ntfs)", msg)
+ self.assertIn(
+ "config says to never destroy NTFS "
+ "(datasource.Azure.never_destroy_ntfs)",
+ msg,
+ )
class TestClearCachedData(CiTestCase):
-
def test_clear_cached_attrs_clears_imds(self):
"""All class attributes are reset to defaults, including imds data."""
tmp = self.tmp_dir()
- paths = helpers.Paths(
- {'cloud_dir': tmp, 'run_dir': tmp})
+ paths = helpers.Paths({"cloud_dir": tmp, "run_dir": tmp})
dsrc = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=paths)
clean_values = [dsrc.metadata, dsrc.userdata, dsrc._metadata_imds]
- dsrc.metadata = 'md'
- dsrc.userdata = 'ud'
- dsrc._metadata_imds = 'imds'
+ dsrc.metadata = "md"
+ dsrc.userdata = "ud"
+ dsrc._metadata_imds = "imds"
dsrc._dirty_cache = True
dsrc.clear_cached_attrs()
self.assertEqual(
- [dsrc.metadata, dsrc.userdata, dsrc._metadata_imds],
- clean_values)
+ [dsrc.metadata, dsrc.userdata, dsrc._metadata_imds], clean_values
+ )
class TestAzureNetExists(CiTestCase):
-
def test_azure_net_must_exist_for_legacy_objpkl(self):
"""DataSourceAzureNet must exist for old obj.pkl files
- that reference it."""
+ that reference it."""
self.assertTrue(hasattr(dsaz, "DataSourceAzureNet"))
class TestPreprovisioningReadAzureOvfFlag(CiTestCase):
-
def test_read_azure_ovf_with_true_flag(self):
"""The read_azure_ovf method should set the PreprovisionedVM
- cfg flag if the proper setting is present."""
+ cfg flag if the proper setting is present."""
content = construct_valid_ovf_env(
- platform_settings={"PreprovisionedVm": "True"})
+ platform_settings={"PreprovisionedVm": "True"}
+ )
ret = dsaz.read_azure_ovf(content)
cfg = ret[2]
- self.assertTrue(cfg['PreprovisionedVm'])
+ self.assertTrue(cfg["PreprovisionedVm"])
def test_read_azure_ovf_with_false_flag(self):
"""The read_azure_ovf method should set the PreprovisionedVM
- cfg flag to false if the proper setting is false."""
+ cfg flag to false if the proper setting is false."""
content = construct_valid_ovf_env(
- platform_settings={"PreprovisionedVm": "False"})
+ platform_settings={"PreprovisionedVm": "False"}
+ )
ret = dsaz.read_azure_ovf(content)
cfg = ret[2]
- self.assertFalse(cfg['PreprovisionedVm'])
+ self.assertFalse(cfg["PreprovisionedVm"])
def test_read_azure_ovf_without_flag(self):
"""The read_azure_ovf method should not set the
- PreprovisionedVM cfg flag."""
+ PreprovisionedVM cfg flag."""
content = construct_valid_ovf_env()
ret = dsaz.read_azure_ovf(content)
cfg = ret[2]
- self.assertFalse(cfg['PreprovisionedVm'])
+ self.assertFalse(cfg["PreprovisionedVm"])
self.assertEqual(None, cfg["PreprovisionedVMType"])
def test_read_azure_ovf_with_running_type(self):
"""The read_azure_ovf method should set PreprovisionedVMType
- cfg flag to Running."""
+ cfg flag to Running."""
content = construct_valid_ovf_env(
- platform_settings={"PreprovisionedVMType": "Running",
- "PreprovisionedVm": "True"})
+ platform_settings={
+ "PreprovisionedVMType": "Running",
+ "PreprovisionedVm": "True",
+ }
+ )
ret = dsaz.read_azure_ovf(content)
cfg = ret[2]
- self.assertTrue(cfg['PreprovisionedVm'])
- self.assertEqual("Running", cfg['PreprovisionedVMType'])
+ self.assertTrue(cfg["PreprovisionedVm"])
+ self.assertEqual("Running", cfg["PreprovisionedVMType"])
def test_read_azure_ovf_with_savable_type(self):
"""The read_azure_ovf method should set PreprovisionedVMType
- cfg flag to Savable."""
+ cfg flag to Savable."""
content = construct_valid_ovf_env(
- platform_settings={"PreprovisionedVMType": "Savable",
- "PreprovisionedVm": "True"})
+ platform_settings={
+ "PreprovisionedVMType": "Savable",
+ "PreprovisionedVm": "True",
+ }
+ )
ret = dsaz.read_azure_ovf(content)
cfg = ret[2]
- self.assertTrue(cfg['PreprovisionedVm'])
- self.assertEqual("Savable", cfg['PreprovisionedVMType'])
+ self.assertTrue(cfg["PreprovisionedVm"])
+ self.assertEqual("Savable", cfg["PreprovisionedVMType"])
-@mock.patch('os.path.isfile')
+@mock.patch("os.path.isfile")
class TestPreprovisioningShouldReprovision(CiTestCase):
-
def setUp(self):
super(TestPreprovisioningShouldReprovision, self).setUp()
tmp = self.tmp_dir()
- self.waagent_d = self.tmp_path('/var/lib/waagent', tmp)
- self.paths = helpers.Paths({'cloud_dir': tmp})
- dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
+ self.waagent_d = self.tmp_path("/var/lib/waagent", tmp)
+ self.paths = helpers.Paths({"cloud_dir": tmp})
+ dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d
- @mock.patch(MOCKPATH + 'util.write_file')
+ @mock.patch(MOCKPATH + "util.write_file")
def test__should_reprovision_with_true_cfg(self, isfile, write_f):
"""The _should_reprovision method should return true with config
- flag present."""
+ flag present."""
isfile.return_value = False
dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths)
- self.assertTrue(dsa._should_reprovision(
- (None, None, {'PreprovisionedVm': True}, None)))
+ self.assertTrue(
+ dsa._should_reprovision(
+ (None, None, {"PreprovisionedVm": True}, None)
+ )
+ )
def test__should_reprovision_with_file_existing(self, isfile):
"""The _should_reprovision method should return True if the sentinal
- exists."""
+ exists."""
isfile.return_value = True
dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths)
- self.assertTrue(dsa._should_reprovision(
- (None, None, {'preprovisionedvm': False}, None)))
+ self.assertTrue(
+ dsa._should_reprovision(
+ (None, None, {"preprovisionedvm": False}, None)
+ )
+ )
def test__should_reprovision_returns_false(self, isfile):
"""The _should_reprovision method should return False
- if config and sentinal are not present."""
+ if config and sentinal are not present."""
isfile.return_value = False
dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths)
self.assertFalse(dsa._should_reprovision((None, None, {}, None)))
- @mock.patch(MOCKPATH + 'util.write_file', autospec=True)
+ @mock.patch(MOCKPATH + "util.write_file", autospec=True)
def test__should_reprovision_uses_imds_md(self, write_file, isfile):
"""The _should_reprovision method should be able to
retrieve the preprovisioning VM type from imds metadata"""
isfile.return_value = False
dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths)
- self.assertTrue(dsa._should_reprovision(
- (None, None, {}, None),
- {'extended': {'compute': {'ppsType': 'Running'}}}))
- self.assertFalse(dsa._should_reprovision(
- (None, None, {}, None),
- {}))
- self.assertFalse(dsa._should_reprovision(
- (None, None, {}, None),
- {'extended': {'compute': {"hasCustomData": False}}}))
-
- @mock.patch(MOCKPATH + 'DataSourceAzure._poll_imds')
+ self.assertTrue(
+ dsa._should_reprovision(
+ (None, None, {}, None),
+ {"extended": {"compute": {"ppsType": "Running"}}},
+ )
+ )
+ self.assertFalse(dsa._should_reprovision((None, None, {}, None), {}))
+ self.assertFalse(
+ dsa._should_reprovision(
+ (None, None, {}, None),
+ {"extended": {"compute": {"hasCustomData": False}}},
+ )
+ )
+
+ @mock.patch(MOCKPATH + "DataSourceAzure._poll_imds")
def test_reprovision_calls__poll_imds(self, _poll_imds, isfile):
"""_reprovision will poll IMDS."""
isfile.return_value = False
hostname = "myhost"
username = "myuser"
- odata = {'HostName': hostname, 'UserName': username}
+ odata = {"HostName": hostname, "UserName": username}
_poll_imds.return_value = construct_valid_ovf_env(data=odata)
dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths)
dsa._reprovision()
@@ -2438,18 +2815,19 @@ class TestPreprovisioningShouldReprovision(CiTestCase):
class TestPreprovisioningHotAttachNics(CiTestCase):
-
def setUp(self):
super(TestPreprovisioningHotAttachNics, self).setUp()
self.tmp = self.tmp_dir()
- self.waagent_d = self.tmp_path('/var/lib/waagent', self.tmp)
- self.paths = helpers.Paths({'cloud_dir': self.tmp})
- dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
- self.paths = helpers.Paths({'cloud_dir': self.tmp})
-
- @mock.patch('cloudinit.sources.helpers.netlink.wait_for_nic_detach_event',
- autospec=True)
- @mock.patch(MOCKPATH + 'util.write_file', autospec=True)
+ self.waagent_d = self.tmp_path("/var/lib/waagent", self.tmp)
+ self.paths = helpers.Paths({"cloud_dir": self.tmp})
+ dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d
+ self.paths = helpers.Paths({"cloud_dir": self.tmp})
+
+ @mock.patch(
+ "cloudinit.sources.helpers.netlink.wait_for_nic_detach_event",
+ autospec=True,
+ )
+ @mock.patch(MOCKPATH + "util.write_file", autospec=True)
def test_nic_detach_writes_marker(self, m_writefile, m_detach):
"""When we detect that a nic gets detached, we write a marker for it"""
dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
@@ -2458,16 +2836,17 @@ class TestPreprovisioningHotAttachNics(CiTestCase):
m_detach.assert_called_with(nl_sock)
self.assertEqual(1, m_detach.call_count)
m_writefile.assert_called_with(
- dsaz.REPROVISION_NIC_DETACHED_MARKER_FILE, mock.ANY)
+ dsaz.REPROVISION_NIC_DETACHED_MARKER_FILE, mock.ANY
+ )
- @mock.patch(MOCKPATH + 'util.write_file', autospec=True)
- @mock.patch(MOCKPATH + 'DataSourceAzure.fallback_interface')
- @mock.patch(MOCKPATH + 'EphemeralDHCPv4WithReporting')
- @mock.patch(MOCKPATH + 'DataSourceAzure._report_ready')
- @mock.patch(MOCKPATH + 'DataSourceAzure._wait_for_nic_detach')
+ @mock.patch(MOCKPATH + "util.write_file", autospec=True)
+ @mock.patch(MOCKPATH + "DataSourceAzure.fallback_interface")
+ @mock.patch(MOCKPATH + "EphemeralDHCPv4WithReporting")
+ @mock.patch(MOCKPATH + "DataSourceAzure._report_ready")
+ @mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach")
def test_detect_nic_attach_reports_ready_and_waits_for_detach(
- self, m_detach, m_report_ready, m_dhcp, m_fallback_if,
- m_writefile):
+ self, m_detach, m_report_ready, m_dhcp, m_fallback_if, m_writefile
+ ):
"""Report ready first and then wait for nic detach"""
dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
dsa._wait_for_all_nics_ready()
@@ -2476,16 +2855,18 @@ class TestPreprovisioningHotAttachNics(CiTestCase):
self.assertEqual(1, m_detach.call_count)
self.assertEqual(1, m_writefile.call_count)
self.assertEqual(1, m_dhcp.call_count)
- m_writefile.assert_called_with(dsaz.REPORTED_READY_MARKER_FILE,
- mock.ANY)
-
- @mock.patch('os.path.isfile')
- @mock.patch(MOCKPATH + 'DataSourceAzure.fallback_interface')
- @mock.patch(MOCKPATH + 'EphemeralDHCPv4WithReporting')
- @mock.patch(MOCKPATH + 'DataSourceAzure._report_ready')
- @mock.patch(MOCKPATH + 'DataSourceAzure._wait_for_nic_detach')
+ m_writefile.assert_called_with(
+ dsaz.REPORTED_READY_MARKER_FILE, mock.ANY
+ )
+
+ @mock.patch("os.path.isfile")
+ @mock.patch(MOCKPATH + "DataSourceAzure.fallback_interface")
+ @mock.patch(MOCKPATH + "EphemeralDHCPv4WithReporting")
+ @mock.patch(MOCKPATH + "DataSourceAzure._report_ready")
+ @mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach")
def test_detect_nic_attach_skips_report_ready_when_marker_present(
- self, m_detach, m_report_ready, m_dhcp, m_fallback_if, m_isfile):
+ self, m_detach, m_report_ready, m_dhcp, m_fallback_if, m_isfile
+ ):
"""Skip reporting ready if we already have a marker file."""
dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
@@ -2499,13 +2880,14 @@ class TestPreprovisioningHotAttachNics(CiTestCase):
self.assertEqual(0, m_dhcp.call_count)
self.assertEqual(1, m_detach.call_count)
- @mock.patch('os.path.isfile')
- @mock.patch(MOCKPATH + 'DataSourceAzure.fallback_interface')
- @mock.patch(MOCKPATH + 'EphemeralDHCPv4WithReporting')
- @mock.patch(MOCKPATH + 'DataSourceAzure._report_ready')
- @mock.patch(MOCKPATH + 'DataSourceAzure._wait_for_nic_detach')
+ @mock.patch("os.path.isfile")
+ @mock.patch(MOCKPATH + "DataSourceAzure.fallback_interface")
+ @mock.patch(MOCKPATH + "EphemeralDHCPv4WithReporting")
+ @mock.patch(MOCKPATH + "DataSourceAzure._report_ready")
+ @mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach")
def test_detect_nic_attach_skips_nic_detach_when_marker_present(
- self, m_detach, m_report_ready, m_dhcp, m_fallback_if, m_isfile):
+ self, m_detach, m_report_ready, m_dhcp, m_fallback_if, m_isfile
+ ):
"""Skip wait for nic detach if it already happened."""
dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
@@ -2516,22 +2898,32 @@ class TestPreprovisioningHotAttachNics(CiTestCase):
self.assertEqual(0, m_dhcp.call_count)
self.assertEqual(0, m_detach.call_count)
- @mock.patch(MOCKPATH + 'DataSourceAzure.wait_for_link_up', autospec=True)
- @mock.patch('cloudinit.sources.helpers.netlink.wait_for_nic_attach_event')
- @mock.patch('cloudinit.sources.net.find_fallback_nic')
- @mock.patch(MOCKPATH + 'get_metadata_from_imds')
- @mock.patch(MOCKPATH + 'EphemeralDHCPv4')
- @mock.patch(MOCKPATH + 'DataSourceAzure._wait_for_nic_detach')
- @mock.patch('os.path.isfile')
+ @mock.patch(MOCKPATH + "DataSourceAzure.wait_for_link_up", autospec=True)
+ @mock.patch("cloudinit.sources.helpers.netlink.wait_for_nic_attach_event")
+ @mock.patch("cloudinit.sources.net.find_fallback_nic")
+ @mock.patch(MOCKPATH + "get_metadata_from_imds")
+ @mock.patch(MOCKPATH + "EphemeralDHCPv4")
+ @mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach")
+ @mock.patch("os.path.isfile")
def test_wait_for_nic_attach_if_no_fallback_interface(
- self, m_isfile, m_detach, m_dhcpv4, m_imds, m_fallback_if,
- m_attach, m_link_up):
+ self,
+ m_isfile,
+ m_detach,
+ m_dhcpv4,
+ m_imds,
+ m_fallback_if,
+ m_attach,
+ m_link_up,
+ ):
"""Wait for nic attach if we do not have a fallback interface"""
dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
lease = {
- 'interface': 'eth9', 'fixed-address': '192.168.2.9',
- 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0',
- 'unknown-245': '624c3620'}
+ "interface": "eth9",
+ "fixed-address": "192.168.2.9",
+ "routers": "192.168.2.1",
+ "subnet-mask": "255.255.255.0",
+ "unknown-245": "624c3620",
+ }
m_isfile.return_value = True
m_attach.return_value = "eth0"
@@ -2550,22 +2942,32 @@ class TestPreprovisioningHotAttachNics(CiTestCase):
self.assertEqual(1, m_link_up.call_count)
m_link_up.assert_called_with(mock.ANY, "eth0")
- @mock.patch(MOCKPATH + 'DataSourceAzure.wait_for_link_up')
- @mock.patch('cloudinit.sources.helpers.netlink.wait_for_nic_attach_event')
- @mock.patch('cloudinit.sources.net.find_fallback_nic')
- @mock.patch(MOCKPATH + 'DataSourceAzure.get_imds_data_with_api_fallback')
- @mock.patch(MOCKPATH + 'EphemeralDHCPv4')
- @mock.patch(MOCKPATH + 'DataSourceAzure._wait_for_nic_detach')
- @mock.patch('os.path.isfile')
+ @mock.patch(MOCKPATH + "DataSourceAzure.wait_for_link_up")
+ @mock.patch("cloudinit.sources.helpers.netlink.wait_for_nic_attach_event")
+ @mock.patch("cloudinit.sources.net.find_fallback_nic")
+ @mock.patch(MOCKPATH + "DataSourceAzure.get_imds_data_with_api_fallback")
+ @mock.patch(MOCKPATH + "EphemeralDHCPv4")
+ @mock.patch(MOCKPATH + "DataSourceAzure._wait_for_nic_detach")
+ @mock.patch("os.path.isfile")
def test_wait_for_nic_attach_multinic_attach(
- self, m_isfile, m_detach, m_dhcpv4, m_imds, m_fallback_if,
- m_attach, m_link_up):
+ self,
+ m_isfile,
+ m_detach,
+ m_dhcpv4,
+ m_imds,
+ m_fallback_if,
+ m_attach,
+ m_link_up,
+ ):
"""Wait for nic attach if we do not have a fallback interface"""
dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
lease = {
- 'interface': 'eth9', 'fixed-address': '192.168.2.9',
- 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0',
- 'unknown-245': '624c3620'}
+ "interface": "eth9",
+ "fixed-address": "192.168.2.9",
+ "routers": "192.168.2.1",
+ "subnet-mask": "255.255.255.0",
+ "unknown-245": "624c3620",
+ }
m_attach_call_count = 0
def nic_attach_ret(nl_sock, nics_found):
@@ -2580,15 +2982,15 @@ class TestPreprovisioningHotAttachNics(CiTestCase):
# Simulate two NICs by adding the same one twice.
md = {
"interface": [
- IMDS_NETWORK_METADATA['interface'][0],
- IMDS_NETWORK_METADATA['interface'][0]
+ IMDS_NETWORK_METADATA["interface"][0],
+ IMDS_NETWORK_METADATA["interface"][0],
]
}
def network_metadata_ret(ifname, retries, type, exc_cb, infinite):
if ifname == "eth0":
return md
- raise requests.Timeout('Fake connection timeout')
+ raise requests.Timeout("Fake connection timeout")
m_isfile.return_value = True
m_attach.side_effect = nic_attach_ret
@@ -2607,25 +3009,29 @@ class TestPreprovisioningHotAttachNics(CiTestCase):
self.assertEqual(1, m_imds.call_count)
self.assertEqual(2, m_link_up.call_count)
- @mock.patch(MOCKPATH + 'DataSourceAzure.get_imds_data_with_api_fallback')
- @mock.patch(MOCKPATH + 'EphemeralDHCPv4')
+ @mock.patch(MOCKPATH + "DataSourceAzure.get_imds_data_with_api_fallback")
+ @mock.patch(MOCKPATH + "EphemeralDHCPv4")
def test_check_if_nic_is_primary_retries_on_failures(
- self, m_dhcpv4, m_imds):
+ self, m_dhcpv4, m_imds
+ ):
"""Retry polling for network metadata on all failures except timeout
and network unreachable errors"""
dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
lease = {
- 'interface': 'eth9', 'fixed-address': '192.168.2.9',
- 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0',
- 'unknown-245': '624c3620'}
+ "interface": "eth9",
+ "fixed-address": "192.168.2.9",
+ "routers": "192.168.2.1",
+ "subnet-mask": "255.255.255.0",
+ "unknown-245": "624c3620",
+ }
eth0Retries = []
eth1Retries = []
# Simulate two NICs by adding the same one twice.
md = {
"interface": [
- IMDS_NETWORK_METADATA['interface'][0],
- IMDS_NETWORK_METADATA['interface'][0]
+ IMDS_NETWORK_METADATA["interface"][0],
+ IMDS_NETWORK_METADATA["interface"][0],
]
}
@@ -2645,9 +3051,9 @@ class TestPreprovisioningHotAttachNics(CiTestCase):
# We are expected to retry for a certain period for both
# timeout errors and network unreachable errors.
if _ < 5:
- cause = requests.Timeout('Fake connection timeout')
+ cause = requests.Timeout("Fake connection timeout")
else:
- cause = requests.ConnectionError('Network Unreachable')
+ cause = requests.ConnectionError("Network Unreachable")
error = url_helper.UrlError(cause=cause)
eth1Retries.append(exc_cb("Connection timeout", error))
# Should stop retrying after 10 retries
@@ -2679,31 +3085,31 @@ class TestPreprovisioningHotAttachNics(CiTestCase):
self.assertTrue(eth1Retries[i])
self.assertFalse(eth1Retries[10])
- @mock.patch('cloudinit.distros.networking.LinuxNetworking.try_set_link_up')
- def test_wait_for_link_up_returns_if_already_up(
- self, m_is_link_up):
+ @mock.patch("cloudinit.distros.networking.LinuxNetworking.try_set_link_up")
+ def test_wait_for_link_up_returns_if_already_up(self, m_is_link_up):
"""Waiting for link to be up should return immediately if the link is
- already up."""
+ already up."""
- distro_cls = distros.fetch('ubuntu')
- distro = distro_cls('ubuntu', {}, self.paths)
+ distro_cls = distros.fetch("ubuntu")
+ distro = distro_cls("ubuntu", {}, self.paths)
dsa = dsaz.DataSourceAzure({}, distro=distro, paths=self.paths)
m_is_link_up.return_value = True
dsa.wait_for_link_up("eth0")
self.assertEqual(1, m_is_link_up.call_count)
- @mock.patch(MOCKPATH + 'net.is_up', autospec=True)
- @mock.patch(MOCKPATH + 'util.write_file')
- @mock.patch('cloudinit.net.read_sys_net')
- @mock.patch('cloudinit.distros.networking.LinuxNetworking.try_set_link_up')
+ @mock.patch(MOCKPATH + "net.is_up", autospec=True)
+ @mock.patch(MOCKPATH + "util.write_file")
+ @mock.patch("cloudinit.net.read_sys_net")
+ @mock.patch("cloudinit.distros.networking.LinuxNetworking.try_set_link_up")
def test_wait_for_link_up_checks_link_after_sleep(
- self, m_try_set_link_up, m_read_sys_net, m_writefile, m_is_up):
+ self, m_try_set_link_up, m_read_sys_net, m_writefile, m_is_up
+ ):
"""Waiting for link to be up should return immediately if the link is
- already up."""
+ already up."""
- distro_cls = distros.fetch('ubuntu')
- distro = distro_cls('ubuntu', {}, self.paths)
+ distro_cls = distros.fetch("ubuntu")
+ distro = distro_cls("ubuntu", {}, self.paths)
dsa = dsaz.DataSourceAzure({}, distro=distro, paths=self.paths)
m_try_set_link_up.return_value = False
@@ -2718,21 +3124,22 @@ class TestPreprovisioningHotAttachNics(CiTestCase):
m_is_up.side_effect = is_up_mock
- with mock.patch('cloudinit.sources.DataSourceAzure.sleep'):
+ with mock.patch("cloudinit.sources.DataSourceAzure.sleep"):
dsa.wait_for_link_up("eth0")
self.assertEqual(2, m_try_set_link_up.call_count)
self.assertEqual(2, m_is_up.call_count)
- @mock.patch(MOCKPATH + 'util.write_file')
- @mock.patch('cloudinit.net.read_sys_net')
- @mock.patch('cloudinit.distros.networking.LinuxNetworking.try_set_link_up')
+ @mock.patch(MOCKPATH + "util.write_file")
+ @mock.patch("cloudinit.net.read_sys_net")
+ @mock.patch("cloudinit.distros.networking.LinuxNetworking.try_set_link_up")
def test_wait_for_link_up_writes_to_device_file(
- self, m_is_link_up, m_read_sys_net, m_writefile):
+ self, m_is_link_up, m_read_sys_net, m_writefile
+ ):
"""Waiting for link to be up should return immediately if the link is
- already up."""
+ already up."""
- distro_cls = distros.fetch('ubuntu')
- distro = distro_cls('ubuntu', {}, self.paths)
+ distro_cls = distros.fetch("ubuntu")
+ distro = distro_cls("ubuntu", {}, self.paths)
dsa = dsaz.DataSourceAzure({}, distro=distro, paths=self.paths)
callcount = 0
@@ -2751,48 +3158,59 @@ class TestPreprovisioningHotAttachNics(CiTestCase):
self.assertEqual(1, m_read_sys_net.call_count)
self.assertEqual(2, m_writefile.call_count)
- @mock.patch('cloudinit.sources.helpers.netlink.'
- 'create_bound_netlink_socket')
+ @mock.patch(
+ "cloudinit.sources.helpers.netlink.create_bound_netlink_socket"
+ )
def test_wait_for_all_nics_ready_raises_if_socket_fails(self, m_socket):
"""Waiting for all nics should raise exception if netlink socket
- creation fails."""
+ creation fails."""
m_socket.side_effect = netlink.NetlinkCreateSocketError
- distro_cls = distros.fetch('ubuntu')
- distro = distro_cls('ubuntu', {}, self.paths)
+ distro_cls = distros.fetch("ubuntu")
+ distro = distro_cls("ubuntu", {}, self.paths)
dsa = dsaz.DataSourceAzure({}, distro=distro, paths=self.paths)
- self.assertRaises(netlink.NetlinkCreateSocketError,
- dsa._wait_for_all_nics_ready)
+ self.assertRaises(
+ netlink.NetlinkCreateSocketError, dsa._wait_for_all_nics_ready
+ )
# dsa._wait_for_all_nics_ready()
-@mock.patch('cloudinit.net.dhcp.EphemeralIPv4Network')
-@mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery')
-@mock.patch('cloudinit.sources.helpers.netlink.'
- 'wait_for_media_disconnect_connect')
-@mock.patch('requests.Session.request')
-@mock.patch(MOCKPATH + 'DataSourceAzure._report_ready')
+@mock.patch("cloudinit.net.dhcp.EphemeralIPv4Network")
+@mock.patch("cloudinit.net.dhcp.maybe_perform_dhcp_discovery")
+@mock.patch(
+ "cloudinit.sources.helpers.netlink.wait_for_media_disconnect_connect"
+)
+@mock.patch("requests.Session.request")
+@mock.patch(MOCKPATH + "DataSourceAzure._report_ready")
class TestPreprovisioningPollIMDS(CiTestCase):
-
def setUp(self):
super(TestPreprovisioningPollIMDS, self).setUp()
self.tmp = self.tmp_dir()
- self.waagent_d = self.tmp_path('/var/lib/waagent', self.tmp)
- self.paths = helpers.Paths({'cloud_dir': self.tmp})
- dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
-
- @mock.patch('time.sleep', mock.MagicMock())
- @mock.patch(MOCKPATH + 'EphemeralDHCPv4')
- def test_poll_imds_re_dhcp_on_timeout(self, m_dhcpv4, m_report_ready,
- m_request, m_media_switch, m_dhcp,
- m_net):
+ self.waagent_d = self.tmp_path("/var/lib/waagent", self.tmp)
+ self.paths = helpers.Paths({"cloud_dir": self.tmp})
+ dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d
+
+ @mock.patch("time.sleep", mock.MagicMock())
+ @mock.patch(MOCKPATH + "EphemeralDHCPv4")
+ def test_poll_imds_re_dhcp_on_timeout(
+ self,
+ m_dhcpv4,
+ m_report_ready,
+ m_request,
+ m_media_switch,
+ m_dhcp,
+ m_net,
+ ):
"""The poll_imds will retry DHCP on IMDS timeout."""
- report_file = self.tmp_path('report_marker', self.tmp)
+ report_file = self.tmp_path("report_marker", self.tmp)
lease = {
- 'interface': 'eth9', 'fixed-address': '192.168.2.9',
- 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0',
- 'unknown-245': '624c3620'}
+ "interface": "eth9",
+ "fixed-address": "192.168.2.9",
+ "routers": "192.168.2.1",
+ "subnet-mask": "255.255.255.0",
+ "unknown-245": "624c3620",
+ }
m_dhcp.return_value = [lease]
m_media_switch.return_value = None
dhcp_ctx = mock.MagicMock(lease=lease)
@@ -2804,7 +3222,7 @@ class TestPreprovisioningPollIMDS(CiTestCase):
def fake_timeout_once(**kwargs):
self.tries += 1
if self.tries == 1:
- raise requests.Timeout('Fake connection timeout')
+ raise requests.Timeout("Fake connection timeout")
elif self.tries in (2, 3):
response = requests.Response()
response.status_code = 404 if self.tries == 2 else 410
@@ -2817,41 +3235,54 @@ class TestPreprovisioningPollIMDS(CiTestCase):
m_request.side_effect = fake_timeout_once
dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths)
- with mock.patch(MOCKPATH + 'REPORTED_READY_MARKER_FILE', report_file):
+ with mock.patch(MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file):
dsa._poll_imds()
self.assertEqual(m_report_ready.call_count, 1)
m_report_ready.assert_called_with(lease=lease)
- self.assertEqual(3, m_dhcpv4.call_count, 'Expected 3 DHCP calls')
- self.assertEqual(4, self.tries, 'Expected 4 total reads from IMDS')
+ self.assertEqual(3, m_dhcpv4.call_count, "Expected 3 DHCP calls")
+ self.assertEqual(4, self.tries, "Expected 4 total reads from IMDS")
- @mock.patch('os.path.isfile')
+ @mock.patch("os.path.isfile")
def test_poll_imds_skips_dhcp_if_ctx_present(
- self, m_isfile, report_ready_func, fake_resp, m_media_switch,
- m_dhcp, m_net):
+ self,
+ m_isfile,
+ report_ready_func,
+ fake_resp,
+ m_media_switch,
+ m_dhcp,
+ m_net,
+ ):
"""The poll_imds function should reuse the dhcp ctx if it is already
- present. This happens when we wait for nic to be hot-attached before
- polling for reprovisiondata. Note that if this ctx is set when
- _poll_imds is called, then it is not expected to be waiting for
- media_disconnect_connect either."""
- report_file = self.tmp_path('report_marker', self.tmp)
+ present. This happens when we wait for nic to be hot-attached before
+ polling for reprovisiondata. Note that if this ctx is set when
+ _poll_imds is called, then it is not expected to be waiting for
+ media_disconnect_connect either."""
+ report_file = self.tmp_path("report_marker", self.tmp)
m_isfile.return_value = True
dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
dsa._ephemeral_dhcp_ctx = "Dummy dhcp ctx"
- with mock.patch(MOCKPATH + 'REPORTED_READY_MARKER_FILE', report_file):
+ with mock.patch(MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file):
dsa._poll_imds()
self.assertEqual(0, m_dhcp.call_count)
self.assertEqual(0, m_media_switch.call_count)
- @mock.patch('os.path.isfile')
- @mock.patch(MOCKPATH + 'EphemeralDHCPv4')
+ @mock.patch("os.path.isfile")
+ @mock.patch(MOCKPATH + "EphemeralDHCPv4")
def test_poll_imds_does_dhcp_on_retries_if_ctx_present(
- self, m_ephemeral_dhcpv4, m_isfile, report_ready_func, m_request,
- m_media_switch, m_dhcp, m_net):
+ self,
+ m_ephemeral_dhcpv4,
+ m_isfile,
+ report_ready_func,
+ m_request,
+ m_media_switch,
+ m_dhcp,
+ m_net,
+ ):
"""The poll_imds function should reuse the dhcp ctx if it is already
- present. This happens when we wait for nic to be hot-attached before
- polling for reprovisiondata. Note that if this ctx is set when
- _poll_imds is called, then it is not expected to be waiting for
- media_disconnect_connect either."""
+ present. This happens when we wait for nic to be hot-attached before
+ polling for reprovisiondata. Note that if this ctx is set when
+ _poll_imds is called, then it is not expected to be waiting for
+ media_disconnect_connect either."""
tries = 0
@@ -2859,15 +3290,16 @@ class TestPreprovisioningPollIMDS(CiTestCase):
nonlocal tries
tries += 1
if tries == 1:
- raise requests.Timeout('Fake connection timeout')
+ raise requests.Timeout("Fake connection timeout")
return mock.MagicMock(status_code=200, text="good", content="good")
m_request.side_effect = fake_timeout_once
- report_file = self.tmp_path('report_marker', self.tmp)
+ report_file = self.tmp_path("report_marker", self.tmp)
m_isfile.return_value = True
dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
- with mock.patch(MOCKPATH + 'REPORTED_READY_MARKER_FILE', report_file),\
- mock.patch.object(dsa, '_ephemeral_dhcp_ctx') as m_dhcp_ctx:
+ with mock.patch(
+ MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file
+ ), mock.patch.object(dsa, "_ephemeral_dhcp_ctx") as m_dhcp_ctx:
m_dhcp_ctx.obtain_lease.return_value = "Dummy lease"
dsa._ephemeral_dhcp_ctx = m_dhcp_ctx
dsa._poll_imds()
@@ -2877,145 +3309,189 @@ class TestPreprovisioningPollIMDS(CiTestCase):
self.assertEqual(2, m_request.call_count)
def test_does_not_poll_imds_report_ready_when_marker_file_exists(
- self, m_report_ready, m_request, m_media_switch, m_dhcp, m_net):
+ self, m_report_ready, m_request, m_media_switch, m_dhcp, m_net
+ ):
"""poll_imds should not call report ready when the reported ready
marker file exists"""
- report_file = self.tmp_path('report_marker', self.tmp)
- write_file(report_file, content='dont run report_ready :)')
- m_dhcp.return_value = [{
- 'interface': 'eth9', 'fixed-address': '192.168.2.9',
- 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0',
- 'unknown-245': '624c3620'}]
+ report_file = self.tmp_path("report_marker", self.tmp)
+ write_file(report_file, content="dont run report_ready :)")
+ m_dhcp.return_value = [
+ {
+ "interface": "eth9",
+ "fixed-address": "192.168.2.9",
+ "routers": "192.168.2.1",
+ "subnet-mask": "255.255.255.0",
+ "unknown-245": "624c3620",
+ }
+ ]
m_media_switch.return_value = None
dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths)
- with mock.patch(MOCKPATH + 'REPORTED_READY_MARKER_FILE', report_file):
+ with mock.patch(MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file):
dsa._poll_imds()
self.assertEqual(m_report_ready.call_count, 0)
def test_poll_imds_report_ready_success_writes_marker_file(
- self, m_report_ready, m_request, m_media_switch, m_dhcp, m_net):
+ self, m_report_ready, m_request, m_media_switch, m_dhcp, m_net
+ ):
"""poll_imds should write the report_ready marker file if
reporting ready succeeds"""
- report_file = self.tmp_path('report_marker', self.tmp)
- m_dhcp.return_value = [{
- 'interface': 'eth9', 'fixed-address': '192.168.2.9',
- 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0',
- 'unknown-245': '624c3620'}]
+ report_file = self.tmp_path("report_marker", self.tmp)
+ m_dhcp.return_value = [
+ {
+ "interface": "eth9",
+ "fixed-address": "192.168.2.9",
+ "routers": "192.168.2.1",
+ "subnet-mask": "255.255.255.0",
+ "unknown-245": "624c3620",
+ }
+ ]
m_media_switch.return_value = None
dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
self.assertFalse(os.path.exists(report_file))
- with mock.patch(MOCKPATH + 'REPORTED_READY_MARKER_FILE', report_file):
+ with mock.patch(MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file):
dsa._poll_imds()
self.assertEqual(m_report_ready.call_count, 1)
self.assertTrue(os.path.exists(report_file))
def test_poll_imds_report_ready_failure_raises_exc_and_doesnt_write_marker(
- self, m_report_ready, m_request, m_media_switch, m_dhcp, m_net):
+ self, m_report_ready, m_request, m_media_switch, m_dhcp, m_net
+ ):
"""poll_imds should write the report_ready marker file if
reporting ready succeeds"""
- report_file = self.tmp_path('report_marker', self.tmp)
- m_dhcp.return_value = [{
- 'interface': 'eth9', 'fixed-address': '192.168.2.9',
- 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0',
- 'unknown-245': '624c3620'}]
+ report_file = self.tmp_path("report_marker", self.tmp)
+ m_dhcp.return_value = [
+ {
+ "interface": "eth9",
+ "fixed-address": "192.168.2.9",
+ "routers": "192.168.2.1",
+ "subnet-mask": "255.255.255.0",
+ "unknown-245": "624c3620",
+ }
+ ]
m_media_switch.return_value = None
m_report_ready.return_value = False
dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
self.assertFalse(os.path.exists(report_file))
- with mock.patch(MOCKPATH + 'REPORTED_READY_MARKER_FILE', report_file):
- self.assertRaises(
- InvalidMetaDataException,
- dsa._poll_imds)
+ with mock.patch(MOCKPATH + "REPORTED_READY_MARKER_FILE", report_file):
+ self.assertRaises(InvalidMetaDataException, dsa._poll_imds)
self.assertEqual(m_report_ready.call_count, 1)
self.assertFalse(os.path.exists(report_file))
-@mock.patch(MOCKPATH + 'DataSourceAzure._report_ready', mock.MagicMock())
-@mock.patch(MOCKPATH + 'subp.subp', mock.MagicMock())
-@mock.patch(MOCKPATH + 'util.write_file', mock.MagicMock())
-@mock.patch('cloudinit.sources.helpers.netlink.'
- 'wait_for_media_disconnect_connect')
-@mock.patch('cloudinit.net.dhcp.EphemeralIPv4Network', autospec=True)
-@mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery')
-@mock.patch('requests.Session.request')
+@mock.patch(MOCKPATH + "DataSourceAzure._report_ready", mock.MagicMock())
+@mock.patch(MOCKPATH + "subp.subp", mock.MagicMock())
+@mock.patch(MOCKPATH + "util.write_file", mock.MagicMock())
+@mock.patch(
+ "cloudinit.sources.helpers.netlink.wait_for_media_disconnect_connect"
+)
+@mock.patch("cloudinit.net.dhcp.EphemeralIPv4Network", autospec=True)
+@mock.patch("cloudinit.net.dhcp.maybe_perform_dhcp_discovery")
+@mock.patch("requests.Session.request")
class TestAzureDataSourcePreprovisioning(CiTestCase):
-
def setUp(self):
super(TestAzureDataSourcePreprovisioning, self).setUp()
tmp = self.tmp_dir()
- self.waagent_d = self.tmp_path('/var/lib/waagent', tmp)
- self.paths = helpers.Paths({'cloud_dir': tmp})
- dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
+ self.waagent_d = self.tmp_path("/var/lib/waagent", tmp)
+ self.paths = helpers.Paths({"cloud_dir": tmp})
+ dsaz.BUILTIN_DS_CONFIG["data_dir"] = self.waagent_d
- def test_poll_imds_returns_ovf_env(self, m_request,
- m_dhcp, m_net,
- m_media_switch):
+ def test_poll_imds_returns_ovf_env(
+ self, m_request, m_dhcp, m_net, m_media_switch
+ ):
"""The _poll_imds method should return the ovf_env.xml."""
m_media_switch.return_value = None
- m_dhcp.return_value = [{
- 'interface': 'eth9', 'fixed-address': '192.168.2.9',
- 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0'}]
- url = 'http://{0}/metadata/reprovisiondata?api-version=2019-06-01'
+ m_dhcp.return_value = [
+ {
+ "interface": "eth9",
+ "fixed-address": "192.168.2.9",
+ "routers": "192.168.2.1",
+ "subnet-mask": "255.255.255.0",
+ }
+ ]
+ url = "http://{0}/metadata/reprovisiondata?api-version=2019-06-01"
host = "169.254.169.254"
full_url = url.format(host)
- m_request.return_value = mock.MagicMock(status_code=200, text="ovf",
- content="ovf")
+ m_request.return_value = mock.MagicMock(
+ status_code=200, text="ovf", content="ovf"
+ )
dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths)
self.assertTrue(len(dsa._poll_imds()) > 0)
- self.assertEqual(m_request.call_args_list,
- [mock.call(allow_redirects=True,
- headers={'Metadata': 'true',
- 'User-Agent':
- 'Cloud-Init/%s' % vs()
- }, method='GET',
- timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS,
- url=full_url)])
+ self.assertEqual(
+ m_request.call_args_list,
+ [
+ mock.call(
+ allow_redirects=True,
+ headers={
+ "Metadata": "true",
+ "User-Agent": "Cloud-Init/%s" % vs(),
+ },
+ method="GET",
+ timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS,
+ url=full_url,
+ )
+ ],
+ )
self.assertEqual(m_dhcp.call_count, 2)
m_net.assert_any_call(
- broadcast='192.168.2.255', interface='eth9', ip='192.168.2.9',
- prefix_or_mask='255.255.255.0', router='192.168.2.1',
- static_routes=None)
+ broadcast="192.168.2.255",
+ interface="eth9",
+ ip="192.168.2.9",
+ prefix_or_mask="255.255.255.0",
+ router="192.168.2.1",
+ static_routes=None,
+ )
self.assertEqual(m_net.call_count, 2)
- def test__reprovision_calls__poll_imds(self, m_request,
- m_dhcp, m_net,
- m_media_switch):
+ def test__reprovision_calls__poll_imds(
+ self, m_request, m_dhcp, m_net, m_media_switch
+ ):
"""The _reprovision method should call poll IMDS."""
m_media_switch.return_value = None
- m_dhcp.return_value = [{
- 'interface': 'eth9', 'fixed-address': '192.168.2.9',
- 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0',
- 'unknown-245': '624c3620'}]
- url = 'http://{0}/metadata/reprovisiondata?api-version=2019-06-01'
+ m_dhcp.return_value = [
+ {
+ "interface": "eth9",
+ "fixed-address": "192.168.2.9",
+ "routers": "192.168.2.1",
+ "subnet-mask": "255.255.255.0",
+ "unknown-245": "624c3620",
+ }
+ ]
+ url = "http://{0}/metadata/reprovisiondata?api-version=2019-06-01"
host = "169.254.169.254"
full_url = url.format(host)
hostname = "myhost"
username = "myuser"
- odata = {'HostName': hostname, 'UserName': username}
+ odata = {"HostName": hostname, "UserName": username}
content = construct_valid_ovf_env(data=odata)
- m_request.return_value = mock.MagicMock(status_code=200, text=content,
- content=content)
+ m_request.return_value = mock.MagicMock(
+ status_code=200, text=content, content=content
+ )
dsa = dsaz.DataSourceAzure({}, distro=mock.Mock(), paths=self.paths)
md, _ud, cfg, _d = dsa._reprovision()
- self.assertEqual(md['local-hostname'], hostname)
- self.assertEqual(cfg['system_info']['default_user']['name'], username)
+ self.assertEqual(md["local-hostname"], hostname)
+ self.assertEqual(cfg["system_info"]["default_user"]["name"], username)
self.assertIn(
mock.call(
allow_redirects=True,
headers={
- 'Metadata': 'true',
- 'User-Agent': 'Cloud-Init/%s' % vs()
+ "Metadata": "true",
+ "User-Agent": "Cloud-Init/%s" % vs(),
},
- method='GET',
+ method="GET",
timeout=dsaz.IMDS_TIMEOUT_IN_SECONDS,
- url=full_url
+ url=full_url,
),
- m_request.call_args_list)
+ m_request.call_args_list,
+ )
self.assertEqual(m_dhcp.call_count, 2)
m_net.assert_any_call(
- broadcast='192.168.2.255', interface='eth9', ip='192.168.2.9',
- prefix_or_mask='255.255.255.0', router='192.168.2.1',
- static_routes=None)
+ broadcast="192.168.2.255",
+ interface="eth9",
+ ip="192.168.2.9",
+ prefix_or_mask="255.255.255.0",
+ router="192.168.2.1",
+ static_routes=None,
+ )
self.assertEqual(m_net.call_count, 2)
@@ -3029,36 +3505,42 @@ class TestRemoveUbuntuNetworkConfigScripts(CiTestCase):
def test_remove_network_scripts_removes_both_files_and_directories(self):
"""Any files or directories in paths are removed when present."""
- file1 = self.tmp_path('file1', dir=self.tmp)
- subdir = self.tmp_path('sub1', dir=self.tmp)
- subfile = self.tmp_path('leaf1', dir=subdir)
- write_file(file1, 'file1content')
- write_file(subfile, 'leafcontent')
+ file1 = self.tmp_path("file1", dir=self.tmp)
+ subdir = self.tmp_path("sub1", dir=self.tmp)
+ subfile = self.tmp_path("leaf1", dir=subdir)
+ write_file(file1, "file1content")
+ write_file(subfile, "leafcontent")
dsaz.maybe_remove_ubuntu_network_config_scripts(paths=[subdir, file1])
for path in (file1, subdir, subfile):
- self.assertFalse(os.path.exists(path),
- 'Found unremoved: %s' % path)
+ self.assertFalse(
+ os.path.exists(path), "Found unremoved: %s" % path
+ )
expected_logs = [
- 'INFO: Removing Ubuntu extended network scripts because cloud-init'
- ' updates Azure network configuration on the following events:'
+ "INFO: Removing Ubuntu extended network scripts because cloud-init"
+ " updates Azure network configuration on the following events:"
" ['boot', 'boot-legacy']",
- 'Recursively deleting %s' % subdir,
- 'Attempting to remove %s' % file1]
+ "Recursively deleting %s" % subdir,
+ "Attempting to remove %s" % file1,
+ ]
for log in expected_logs:
self.assertIn(log, self.logs.getvalue())
def test_remove_network_scripts_only_attempts_removal_if_path_exists(self):
"""Any files or directories absent are skipped without error."""
- dsaz.maybe_remove_ubuntu_network_config_scripts(paths=[
- self.tmp_path('nodirhere/', dir=self.tmp),
- self.tmp_path('notfilehere', dir=self.tmp)])
- self.assertNotIn('/not/a', self.logs.getvalue()) # No delete logs
-
- @mock.patch(MOCKPATH + 'os.path.exists')
- def test_remove_network_scripts_default_removes_stock_scripts(self,
- m_exists):
+ dsaz.maybe_remove_ubuntu_network_config_scripts(
+ paths=[
+ self.tmp_path("nodirhere/", dir=self.tmp),
+ self.tmp_path("notfilehere", dir=self.tmp),
+ ]
+ )
+ self.assertNotIn("/not/a", self.logs.getvalue()) # No delete logs
+
+ @mock.patch(MOCKPATH + "os.path.exists")
+ def test_remove_network_scripts_default_removes_stock_scripts(
+ self, m_exists
+ ):
"""Azure's stock ubuntu image scripts and artifacts are removed."""
# Report path absent on all to avoid delete operation
m_exists.return_value = False
@@ -3070,24 +3552,25 @@ class TestRemoveUbuntuNetworkConfigScripts(CiTestCase):
class TestWBIsPlatformViable(CiTestCase):
"""White box tests for _is_platform_viable."""
+
with_logs = True
- @mock.patch(MOCKPATH + 'dmi.read_dmi_data')
+ @mock.patch(MOCKPATH + "dmi.read_dmi_data")
def test_true_on_non_azure_chassis(self, m_read_dmi_data):
"""Return True if DMI chassis-asset-tag is AZURE_CHASSIS_ASSET_TAG."""
m_read_dmi_data.return_value = dsaz.AZURE_CHASSIS_ASSET_TAG
- self.assertTrue(dsaz._is_platform_viable('doesnotmatter'))
+ self.assertTrue(dsaz._is_platform_viable("doesnotmatter"))
- @mock.patch(MOCKPATH + 'os.path.exists')
- @mock.patch(MOCKPATH + 'dmi.read_dmi_data')
+ @mock.patch(MOCKPATH + "os.path.exists")
+ @mock.patch(MOCKPATH + "dmi.read_dmi_data")
def test_true_on_azure_ovf_env_in_seed_dir(self, m_read_dmi_data, m_exist):
"""Return True if ovf-env.xml exists in known seed dirs."""
# Non-matching Azure chassis-asset-tag
- m_read_dmi_data.return_value = dsaz.AZURE_CHASSIS_ASSET_TAG + 'X'
+ m_read_dmi_data.return_value = dsaz.AZURE_CHASSIS_ASSET_TAG + "X"
m_exist.return_value = True
- self.assertTrue(dsaz._is_platform_viable('/some/seed/dir'))
- m_exist.called_once_with('/other/seed/dir')
+ self.assertTrue(dsaz._is_platform_viable("/some/seed/dir"))
+ m_exist.called_once_with("/other/seed/dir")
def test_false_on_no_matching_azure_criteria(self):
"""Report non-azure on unmatched asset tag, ovf-env absent and no dev.
@@ -3096,17 +3579,25 @@ class TestWBIsPlatformViable(CiTestCase):
AZURE_CHASSIS_ASSET_TAG, no ovf-env.xml files exist in known seed dirs
and no devices have a label starting with prefix 'rd_rdfe_'.
"""
- self.assertFalse(wrap_and_call(
- MOCKPATH,
- {'os.path.exists': False,
- # Non-matching Azure chassis-asset-tag
- 'dmi.read_dmi_data': dsaz.AZURE_CHASSIS_ASSET_TAG + 'X',
- 'subp.which': None},
- dsaz._is_platform_viable, 'doesnotmatter'))
+ self.assertFalse(
+ wrap_and_call(
+ MOCKPATH,
+ {
+ "os.path.exists": False,
+ # Non-matching Azure chassis-asset-tag
+ "dmi.read_dmi_data": dsaz.AZURE_CHASSIS_ASSET_TAG + "X",
+ "subp.which": None,
+ },
+ dsaz._is_platform_viable,
+ "doesnotmatter",
+ )
+ )
self.assertIn(
"DEBUG: Non-Azure DMI asset tag '{0}' discovered.\n".format(
- dsaz.AZURE_CHASSIS_ASSET_TAG + 'X'),
- self.logs.getvalue())
+ dsaz.AZURE_CHASSIS_ASSET_TAG + "X"
+ ),
+ self.logs.getvalue(),
+ )
class TestRandomSeed(CiTestCase):
@@ -3120,13 +3611,14 @@ class TestRandomSeed(CiTestCase):
path = resourceLocation("azure/non_unicode_random_string")
result = dsaz._get_random_seed(path)
- obj = {'seed': result}
+ obj = {"seed": result}
try:
serialized = json_dumps(obj)
deserialized = load_json(serialized)
except UnicodeDecodeError:
self.fail("Non-serializable random seed returned")
- self.assertEqual(deserialized['seed'], result)
+ self.assertEqual(deserialized["seed"], result)
+
# vi: ts=4 expandtab