summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource/test_azure.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests/test_datasource/test_azure.py')
-rw-r--r--tests/unittests/test_datasource/test_azure.py244
1 files changed, 204 insertions, 40 deletions
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index 7cb1812a..254e9876 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -5,20 +5,19 @@ from cloudinit.util import b64e, decode_binary, load_file, write_file
from cloudinit.sources import DataSourceAzure as dsaz
from cloudinit.util import find_freebsd_part
from cloudinit.util import get_path_dev_freebsd
-
+from cloudinit.version import version_string as vs
from cloudinit.tests.helpers import (CiTestCase, TestCase, populate_dir, mock,
ExitStack, PY26, SkipTest)
import crypt
import os
-import shutil
import stat
-import tempfile
import xml.etree.ElementTree as ET
import yaml
-def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
+def construct_valid_ovf_env(data=None, pubkeys=None,
+ userdata=None, platform_settings=None):
if data is None:
data = {'HostName': 'FOOHOST'}
if pubkeys is None:
@@ -38,9 +37,9 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
"""
for key, dval in data.items():
if isinstance(dval, dict):
- val = dval.get('text')
- attrs = ' ' + ' '.join(["%s='%s'" % (k, v) for k, v in dval.items()
- if k != 'text'])
+ val = dict(dval).get('text')
+ attrs = ' ' + ' '.join(["%s='%s'" % (k, v) for k, v
+ in dict(dval).items() if k != 'text'])
else:
val = dval
attrs = ""
@@ -68,10 +67,12 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
<KmsServerHostname>kms.core.windows.net</KmsServerHostname>
<ProvisionGuestAgent>false</ProvisionGuestAgent>
- <GuestAgentPackageName i:nil="true" />
- </PlatformSettings></wa:PlatformSettingsSection>
-</Environment>
- """
+ <GuestAgentPackageName i:nil="true" />"""
+ if platform_settings:
+ for k, v in platform_settings.items():
+ content += "<%s>%s</%s>\n" % (k, v, k)
+ content += """</PlatformSettings></wa:PlatformSettingsSection>
+</Environment>"""
return content
@@ -84,11 +85,11 @@ class TestAzureDataSource(CiTestCase):
super(TestAzureDataSource, self).setUp()
if PY26:
raise SkipTest("Does not work on python 2.6")
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
+ self.tmp = self.tmp_dir()
# patch cloud_dir, so our 'seed_dir' is guaranteed empty
- self.paths = helpers.Paths({'cloud_dir': self.tmp})
+ self.paths = helpers.Paths(
+ {'cloud_dir': self.tmp, 'run_dir': self.tmp})
self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent')
self.patches = ExitStack()
@@ -176,6 +177,7 @@ scbus-1 on xpt0 bus 0
(dsaz, 'get_hostname', mock.MagicMock()),
(dsaz, 'set_hostname', mock.MagicMock()),
(dsaz, 'get_metadata_from_fabric', self.get_metadata_from_fabric),
+ (dsaz.util, 'which', lambda x: True),
(dsaz.util, 'read_dmi_data', mock.MagicMock(
side_effect=_dmi_mocks)),
(dsaz.util, 'wait_for_files', mock.MagicMock(
@@ -642,7 +644,9 @@ fdescfs /dev/fd fdescfs rw 0 0
self.assertEqual(netconfig, expected_config)
-class TestAzureBounce(TestCase):
+class TestAzureBounce(CiTestCase):
+
+ with_logs = True
def mock_out_azure_moving_parts(self):
self.patches.enter_context(
@@ -655,6 +659,8 @@ class TestAzureBounce(TestCase):
self.patches.enter_context(
mock.patch.object(dsaz, 'get_metadata_from_fabric',
mock.MagicMock(return_value={})))
+ self.patches.enter_context(
+ mock.patch.object(dsaz.util, 'which', lambda x: True))
def _dmi_mocks(key):
if key == 'system-uuid':
@@ -669,10 +675,10 @@ class TestAzureBounce(TestCase):
def setUp(self):
super(TestAzureBounce, self).setUp()
- self.tmp = tempfile.mkdtemp()
+ self.tmp = self.tmp_dir()
self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent')
- self.paths = helpers.Paths({'cloud_dir': self.tmp})
- self.addCleanup(shutil.rmtree, self.tmp)
+ self.paths = helpers.Paths(
+ {'cloud_dir': self.tmp, 'run_dir': self.tmp})
dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
self.patches = ExitStack()
self.mock_out_azure_moving_parts()
@@ -714,21 +720,24 @@ class TestAzureBounce(TestCase):
def test_disabled_bounce_does_not_change_hostname(self):
cfg = {'hostname_bounce': {'policy': 'off'}}
- self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg)).get_data()
+ ds = self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg))
+ ds.get_data()
self.assertEqual(0, self.set_hostname.call_count)
@mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
def test_disabled_bounce_does_not_perform_bounce(
self, perform_hostname_bounce):
cfg = {'hostname_bounce': {'policy': 'off'}}
- self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg)).get_data()
+ ds = self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg))
+ ds.get_data()
self.assertEqual(0, perform_hostname_bounce.call_count)
def test_same_hostname_does_not_change_hostname(self):
host_name = 'unchanged-host-name'
self.get_hostname.return_value = host_name
cfg = {'hostname_bounce': {'policy': 'yes'}}
- self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data()
+ ds = self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg))
+ ds.get_data()
self.assertEqual(0, self.set_hostname.call_count)
@mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
@@ -737,7 +746,8 @@ class TestAzureBounce(TestCase):
host_name = 'unchanged-host-name'
self.get_hostname.return_value = host_name
cfg = {'hostname_bounce': {'policy': 'yes'}}
- self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data()
+ ds = self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg))
+ ds.get_data()
self.assertEqual(0, perform_hostname_bounce.call_count)
@mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
@@ -751,6 +761,22 @@ class TestAzureBounce(TestCase):
self.assertTrue(ret)
self.assertEqual(1, perform_hostname_bounce.call_count)
+ def test_bounce_skipped_on_ifupdown_absent(self):
+ host_name = 'unchanged-host-name'
+ self.get_hostname.return_value = host_name
+ cfg = {'hostname_bounce': {'policy': 'force'}}
+ dsrc = self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg),
+ agent_command=['not', '__builtin__'])
+ patch_path = 'cloudinit.sources.DataSourceAzure.util.which'
+ with mock.patch(patch_path) as m_which:
+ m_which.return_value = None
+ ret = self._get_and_setup(dsrc)
+ self.assertEqual([mock.call('ifup')], m_which.call_args_list)
+ self.assertTrue(ret)
+ self.assertIn(
+ "Skipping network bounce: ifupdown utils aren't present.",
+ self.logs.getvalue())
+
def test_different_hostnames_sets_hostname(self):
expected_hostname = 'azure-expected-host-name'
self.get_hostname.return_value = 'default-host-name'
@@ -815,9 +841,7 @@ class TestAzureBounce(TestCase):
self.assertEqual(hostname, bounce_env['hostname'])
self.assertEqual(old_hostname, bounce_env['old_hostname'])
- def test_default_bounce_command_used_by_default(self):
- cmd = 'default-bounce-command'
- dsaz.BUILTIN_DS_CONFIG['hostname_bounce']['command'] = cmd
+ def test_default_bounce_command_ifup_used_by_default(self):
cfg = {'hostname_bounce': {'policy': 'force'}}
data = self.get_ovf_env_with_dscfg('some-hostname', cfg)
dsrc = self._get_ds(data, agent_command=['not', '__builtin__'])
@@ -825,7 +849,8 @@ class TestAzureBounce(TestCase):
self.assertTrue(ret)
self.assertEqual(1, self.subp.call_count)
bounce_args = self.subp.call_args[1]['args']
- self.assertEqual(cmd, bounce_args)
+ self.assertEqual(
+ dsaz.BOUNCE_COMMAND_IFUP, bounce_args)
@mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
def test_set_hostname_option_can_disable_bounce(
@@ -895,9 +920,6 @@ class TestCanDevBeReformatted(CiTestCase):
setattr(self, sattr, patcher.start())
self.addCleanup(patcher.stop)
- def setUp(self):
- super(TestCanDevBeReformatted, self).setUp()
-
def patchup(self, devs):
bypath = {}
for path, data in devs.items():
@@ -952,14 +974,14 @@ class TestCanDevBeReformatted(CiTestCase):
'/dev/sda3': {'num': 3},
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertFalse(False, value)
+ self.assertFalse(value)
self.assertIn("3 or more", msg.lower())
def test_no_partitions_is_false(self):
"""A disk with no partitions can not be formatted."""
self.patchup({'/dev/sda': {}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertEqual(False, value)
+ self.assertFalse(value)
self.assertIn("not partitioned", msg.lower())
def test_two_partitions_not_ntfs_false(self):
@@ -971,7 +993,7 @@ class TestCanDevBeReformatted(CiTestCase):
'/dev/sda2': {'num': 2, 'fs': 'ext4', 'files': []},
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertFalse(False, value)
+ self.assertFalse(value)
self.assertIn("not ntfs", msg.lower())
def test_two_partitions_ntfs_populated_false(self):
@@ -984,7 +1006,7 @@ class TestCanDevBeReformatted(CiTestCase):
'files': ['secret.txt']},
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertFalse(False, value)
+ self.assertFalse(value)
self.assertIn("files on it", msg.lower())
def test_two_partitions_ntfs_empty_is_true(self):
@@ -996,7 +1018,7 @@ class TestCanDevBeReformatted(CiTestCase):
'/dev/sda2': {'num': 2, 'fs': 'ntfs', 'files': []},
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertEqual(True, value)
+ self.assertTrue(value)
self.assertIn("safe for", msg.lower())
def test_one_partition_not_ntfs_false(self):
@@ -1007,7 +1029,7 @@ class TestCanDevBeReformatted(CiTestCase):
'/dev/sda1': {'num': 1, 'fs': 'zfs'},
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertEqual(False, value)
+ self.assertFalse(value)
self.assertIn("not ntfs", msg.lower())
def test_one_partition_ntfs_populated_false(self):
@@ -1019,7 +1041,7 @@ class TestCanDevBeReformatted(CiTestCase):
'files': ['file1.txt', 'file2.exe']},
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertEqual(False, value)
+ self.assertFalse(value)
self.assertIn("files on it", msg.lower())
def test_one_partition_ntfs_empty_is_true(self):
@@ -1030,7 +1052,7 @@ class TestCanDevBeReformatted(CiTestCase):
'/dev/sda1': {'num': 1, 'fs': 'ntfs', 'files': []}
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertEqual(True, value)
+ self.assertTrue(value)
self.assertIn("safe for", msg.lower())
def test_one_partition_ntfs_empty_with_dataloss_file_is_true(self):
@@ -1042,7 +1064,7 @@ class TestCanDevBeReformatted(CiTestCase):
'files': ['dataloss_warning_readme.txt']}
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertEqual(True, value)
+ self.assertTrue(value)
self.assertIn("safe for", msg.lower())
def test_one_partition_through_realpath_is_true(self):
@@ -1057,7 +1079,7 @@ class TestCanDevBeReformatted(CiTestCase):
'realpath': '/dev/sdb1'}
}}})
value, msg = dsaz.can_dev_be_reformatted(epath)
- self.assertEqual(True, value)
+ self.assertTrue(value)
self.assertIn("safe for", msg.lower())
def test_three_partition_through_realpath_is_false(self):
@@ -1076,7 +1098,7 @@ class TestCanDevBeReformatted(CiTestCase):
'realpath': '/dev/sdb3'}
}}})
value, msg = dsaz.can_dev_be_reformatted(epath)
- self.assertEqual(False, value)
+ self.assertFalse(value)
self.assertIn("3 or more", msg.lower())
@@ -1088,4 +1110,146 @@ class TestAzureNetExists(CiTestCase):
self.assertTrue(hasattr(dsaz, "DataSourceAzureNet"))
+@mock.patch('cloudinit.sources.DataSourceAzure.util.subp')
+@mock.patch.object(dsaz, 'get_hostname')
+@mock.patch.object(dsaz, 'set_hostname')
+class TestAzureDataSourcePreprovisioning(CiTestCase):
+
+ def setUp(self):
+ super(TestAzureDataSourcePreprovisioning, self).setUp()
+ tmp = self.tmp_dir()
+ self.waagent_d = self.tmp_path('/var/lib/waagent', tmp)
+ self.paths = helpers.Paths({'cloud_dir': tmp})
+ dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
+
+ def test_read_azure_ovf_with_true_flag(self, *args):
+ """The read_azure_ovf method should set the PreprovisionedVM
+ cfg flag if the proper setting is present."""
+ content = construct_valid_ovf_env(
+ platform_settings={"PreprovisionedVm": "True"})
+ ret = dsaz.read_azure_ovf(content)
+ cfg = ret[2]
+ self.assertTrue(cfg['PreprovisionedVm'])
+
+ def test_read_azure_ovf_with_false_flag(self, *args):
+ """The read_azure_ovf method should set the PreprovisionedVM
+ cfg flag to false if the proper setting is false."""
+ content = construct_valid_ovf_env(
+ platform_settings={"PreprovisionedVm": "False"})
+ ret = dsaz.read_azure_ovf(content)
+ cfg = ret[2]
+ self.assertFalse(cfg['PreprovisionedVm'])
+
+ def test_read_azure_ovf_without_flag(self, *args):
+ """The read_azure_ovf method should not set the
+ PreprovisionedVM cfg flag."""
+ content = construct_valid_ovf_env()
+ ret = dsaz.read_azure_ovf(content)
+ cfg = ret[2]
+ self.assertFalse(cfg['PreprovisionedVm'])
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.util.is_FreeBSD')
+ @mock.patch('cloudinit.net.dhcp.EphemeralIPv4Network')
+ @mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery')
+ @mock.patch('requests.Session.request')
+ def test_poll_imds_returns_ovf_env(self, fake_resp, m_dhcp, m_net,
+ m_is_bsd, *args):
+ """The _poll_imds method should return the ovf_env.xml."""
+ m_is_bsd.return_value = False
+ m_dhcp.return_value = [{
+ 'interface': 'eth9', 'fixed-address': '192.168.2.9',
+ 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0'}]
+ url = 'http://{0}/metadata/reprovisiondata?api-version=2017-04-02'
+ host = "169.254.169.254"
+ full_url = url.format(host)
+ fake_resp.return_value = mock.MagicMock(status_code=200, text="ovf")
+ dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
+ self.assertTrue(len(dsa._poll_imds()) > 0)
+ self.assertEqual(fake_resp.call_args_list,
+ [mock.call(allow_redirects=True,
+ headers={'Metadata': 'true',
+ 'User-Agent':
+ 'Cloud-Init/%s' % vs()
+ }, method='GET', timeout=60.0,
+ url=full_url),
+ mock.call(allow_redirects=True,
+ headers={'Metadata': 'true',
+ 'User-Agent':
+ 'Cloud-Init/%s' % vs()
+ }, method='GET', url=full_url)])
+ self.assertEqual(m_dhcp.call_count, 1)
+ m_net.assert_any_call(
+ broadcast='192.168.2.255', interface='eth9', ip='192.168.2.9',
+ prefix_or_mask='255.255.255.0', router='192.168.2.1')
+ self.assertEqual(m_net.call_count, 1)
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.util.is_FreeBSD')
+ @mock.patch('cloudinit.net.dhcp.EphemeralIPv4Network')
+ @mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery')
+ @mock.patch('requests.Session.request')
+ def test__reprovision_calls__poll_imds(self, fake_resp, m_dhcp, m_net,
+ m_is_bsd, *args):
+ """The _reprovision method should call poll IMDS."""
+ m_is_bsd.return_value = False
+ m_dhcp.return_value = [{
+ 'interface': 'eth9', 'fixed-address': '192.168.2.9',
+ 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0',
+ 'unknown-245': '624c3620'}]
+ url = 'http://{0}/metadata/reprovisiondata?api-version=2017-04-02'
+ host = "169.254.169.254"
+ full_url = url.format(host)
+ hostname = "myhost"
+ username = "myuser"
+ odata = {'HostName': hostname, 'UserName': username}
+ content = construct_valid_ovf_env(data=odata)
+ fake_resp.return_value = mock.MagicMock(status_code=200, text=content)
+ dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
+ md, ud, cfg, d = dsa._reprovision()
+ self.assertEqual(md['local-hostname'], hostname)
+ self.assertEqual(cfg['system_info']['default_user']['name'], username)
+ self.assertEqual(fake_resp.call_args_list,
+ [mock.call(allow_redirects=True,
+ headers={'Metadata': 'true',
+ 'User-Agent':
+ 'Cloud-Init/%s' % vs()},
+ method='GET', timeout=60.0, url=full_url),
+ mock.call(allow_redirects=True,
+ headers={'Metadata': 'true',
+ 'User-Agent':
+ 'Cloud-Init/%s' % vs()},
+ method='GET', url=full_url)])
+ self.assertEqual(m_dhcp.call_count, 1)
+ m_net.assert_any_call(
+ broadcast='192.168.2.255', interface='eth9', ip='192.168.2.9',
+ prefix_or_mask='255.255.255.0', router='192.168.2.1')
+ self.assertEqual(m_net.call_count, 1)
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.util.write_file')
+ @mock.patch('os.path.isfile')
+ def test__should_reprovision_with_true_cfg(self, isfile, write_f, *args):
+ """The _should_reprovision method should return true with config
+ flag present."""
+ isfile.return_value = False
+ dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
+ self.assertTrue(dsa._should_reprovision(
+ (None, None, {'PreprovisionedVm': True}, None)))
+
+ @mock.patch('os.path.isfile')
+ def test__should_reprovision_with_file_existing(self, isfile, *args):
+ """The _should_reprovision method should return True if the sentinal
+ exists."""
+ isfile.return_value = True
+ dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
+ self.assertTrue(dsa._should_reprovision(
+ (None, None, {'preprovisionedvm': False}, None)))
+
+ @mock.patch('os.path.isfile')
+ def test__should_reprovision_returns_false(self, isfile, *args):
+ """The _should_reprovision method should return False
+ if config and sentinal are not present."""
+ isfile.return_value = False
+ dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
+ self.assertFalse(dsa._should_reprovision((None, None, {}, None)))
+
+
# vi: ts=4 expandtab