summaryrefslogtreecommitdiff
path: root/tests/unittests/test_datasource
diff options
context:
space:
mode:
authorChad Smith <chad.smith@canonical.com>2018-02-02 11:11:36 -0700
committerChad Smith <chad.smith@canonical.com>2018-02-02 11:11:36 -0700
commit78013bc65030421699b5feb66bc8b7a205abfbc0 (patch)
tree2ebf7111129f4aaf8a833ba6d226d4513ed59388 /tests/unittests/test_datasource
parent192261fe38a32edbd1f605ba25bbb6f4822a0720 (diff)
parentf7deaf15acf382d62554e2b1d70daa9a9109d542 (diff)
downloadvyos-cloud-init-78013bc65030421699b5feb66bc8b7a205abfbc0.tar.gz
vyos-cloud-init-78013bc65030421699b5feb66bc8b7a205abfbc0.zip
merge from master at 17.2-30-gf7deaf15
Diffstat (limited to 'tests/unittests/test_datasource')
-rw-r--r--tests/unittests/test_datasource/test_aliyun.py18
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py22
-rw-r--r--tests/unittests/test_datasource/test_azure.py244
-rw-r--r--tests/unittests/test_datasource/test_cloudsigma.py13
-rw-r--r--tests/unittests/test_datasource/test_cloudstack.py19
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py62
-rw-r--r--tests/unittests/test_datasource/test_digitalocean.py20
-rw-r--r--tests/unittests/test_datasource/test_ec2.py8
-rw-r--r--tests/unittests/test_datasource/test_gce.py196
-rw-r--r--tests/unittests/test_datasource/test_maas.py53
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py14
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py235
-rw-r--r--tests/unittests/test_datasource/test_openstack.py12
-rw-r--r--tests/unittests/test_datasource/test_ovf.py111
-rw-r--r--tests/unittests/test_datasource/test_scaleway.py13
-rw-r--r--tests/unittests/test_datasource/test_smartos.py3
16 files changed, 833 insertions, 210 deletions
diff --git a/tests/unittests/test_datasource/test_aliyun.py b/tests/unittests/test_datasource/test_aliyun.py
index 82ee9714..4fa9616b 100644
--- a/tests/unittests/test_datasource/test_aliyun.py
+++ b/tests/unittests/test_datasource/test_aliyun.py
@@ -47,6 +47,9 @@ def register_mock_metaserver(base_url, data):
elif isinstance(body, list):
register(base_url.rstrip('/'), '\n'.join(body) + '\n')
elif isinstance(body, dict):
+ if not body:
+ register(base_url.rstrip('/') + '/', 'not found',
+ status_code=404)
vals = []
for k, v in body.items():
if isinstance(v, (str, list)):
@@ -67,7 +70,7 @@ class TestAliYunDatasource(test_helpers.HttprettyTestCase):
super(TestAliYunDatasource, self).setUp()
cfg = {'datasource': {'AliYun': {'timeout': '1', 'max_wait': '1'}}}
distro = {}
- paths = helpers.Paths({})
+ paths = helpers.Paths({'run_dir': self.tmp_dir()})
self.ds = ay.DataSourceAliYun(cfg, distro, paths)
self.metadata_address = self.ds.metadata_urls[0]
@@ -91,9 +94,22 @@ class TestAliYunDatasource(test_helpers.HttprettyTestCase):
self.metadata_address,
self.ds.min_metadata_version, 'user-data')
+ # EC2 provides an instance-identity document which must return 404 here
+ # for this test to pass.
+ @property
+ def default_identity(self):
+ return {}
+
+ @property
+ def identity_url(self):
+ return os.path.join(self.metadata_address,
+ self.ds.min_metadata_version,
+ 'dynamic', 'instance-identity')
+
def regist_default_server(self):
register_mock_metaserver(self.metadata_url, self.default_metadata)
register_mock_metaserver(self.userdata_url, self.default_userdata)
+ register_mock_metaserver(self.identity_url, self.default_identity)
def _test_get_data(self):
self.assertEqual(self.ds.metadata, self.default_metadata)
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
index a4dfb540..3253f3ad 100644
--- a/tests/unittests/test_datasource/test_altcloud.py
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -18,7 +18,7 @@ import tempfile
from cloudinit import helpers
from cloudinit import util
-from cloudinit.tests.helpers import TestCase
+from cloudinit.tests.helpers import CiTestCase
import cloudinit.sources.DataSourceAltCloud as dsac
@@ -97,7 +97,7 @@ def _dmi_data(expected):
return _data
-class TestGetCloudType(TestCase):
+class TestGetCloudType(CiTestCase):
'''
Test to exercise method: DataSourceAltCloud.get_cloud_type()
'''
@@ -143,14 +143,16 @@ class TestGetCloudType(TestCase):
self.assertEqual('UNKNOWN', dsrc.get_cloud_type())
-class TestGetDataCloudInfoFile(TestCase):
+class TestGetDataCloudInfoFile(CiTestCase):
'''
Test to exercise method: DataSourceAltCloud.get_data()
With a contrived CLOUD_INFO_FILE
'''
def setUp(self):
'''Set up.'''
- self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ self.tmp = self.tmp_dir()
+ self.paths = helpers.Paths(
+ {'cloud_dir': self.tmp, 'run_dir': self.tmp})
self.cloud_info_file = tempfile.mkstemp()[1]
self.dmi_data = util.read_dmi_data
dsac.CLOUD_INFO_FILE = self.cloud_info_file
@@ -207,14 +209,16 @@ class TestGetDataCloudInfoFile(TestCase):
self.assertEqual(False, dsrc.get_data())
-class TestGetDataNoCloudInfoFile(TestCase):
+class TestGetDataNoCloudInfoFile(CiTestCase):
'''
Test to exercise method: DataSourceAltCloud.get_data()
Without a CLOUD_INFO_FILE
'''
def setUp(self):
'''Set up.'''
- self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ self.tmp = self.tmp_dir()
+ self.paths = helpers.Paths(
+ {'cloud_dir': self.tmp, 'run_dir': self.tmp})
self.dmi_data = util.read_dmi_data
dsac.CLOUD_INFO_FILE = \
'no such file'
@@ -254,7 +258,7 @@ class TestGetDataNoCloudInfoFile(TestCase):
self.assertEqual(False, dsrc.get_data())
-class TestUserDataRhevm(TestCase):
+class TestUserDataRhevm(CiTestCase):
'''
Test to exercise method: DataSourceAltCloud.user_data_rhevm()
'''
@@ -320,7 +324,7 @@ class TestUserDataRhevm(TestCase):
self.assertEqual(False, dsrc.user_data_rhevm())
-class TestUserDataVsphere(TestCase):
+class TestUserDataVsphere(CiTestCase):
'''
Test to exercise method: DataSourceAltCloud.user_data_vsphere()
'''
@@ -368,7 +372,7 @@ class TestUserDataVsphere(TestCase):
self.assertEqual(1, m_mount_cb.call_count)
-class TestReadUserDataCallback(TestCase):
+class TestReadUserDataCallback(CiTestCase):
'''
Test to exercise method: DataSourceAltCloud.read_user_data_callback()
'''
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index 7cb1812a..254e9876 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -5,20 +5,19 @@ from cloudinit.util import b64e, decode_binary, load_file, write_file
from cloudinit.sources import DataSourceAzure as dsaz
from cloudinit.util import find_freebsd_part
from cloudinit.util import get_path_dev_freebsd
-
+from cloudinit.version import version_string as vs
from cloudinit.tests.helpers import (CiTestCase, TestCase, populate_dir, mock,
ExitStack, PY26, SkipTest)
import crypt
import os
-import shutil
import stat
-import tempfile
import xml.etree.ElementTree as ET
import yaml
-def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
+def construct_valid_ovf_env(data=None, pubkeys=None,
+ userdata=None, platform_settings=None):
if data is None:
data = {'HostName': 'FOOHOST'}
if pubkeys is None:
@@ -38,9 +37,9 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
"""
for key, dval in data.items():
if isinstance(dval, dict):
- val = dval.get('text')
- attrs = ' ' + ' '.join(["%s='%s'" % (k, v) for k, v in dval.items()
- if k != 'text'])
+ val = dict(dval).get('text')
+ attrs = ' ' + ' '.join(["%s='%s'" % (k, v) for k, v
+ in dict(dval).items() if k != 'text'])
else:
val = dval
attrs = ""
@@ -68,10 +67,12 @@ def construct_valid_ovf_env(data=None, pubkeys=None, userdata=None):
xmlns:i="http://www.w3.org/2001/XMLSchema-instance">
<KmsServerHostname>kms.core.windows.net</KmsServerHostname>
<ProvisionGuestAgent>false</ProvisionGuestAgent>
- <GuestAgentPackageName i:nil="true" />
- </PlatformSettings></wa:PlatformSettingsSection>
-</Environment>
- """
+ <GuestAgentPackageName i:nil="true" />"""
+ if platform_settings:
+ for k, v in platform_settings.items():
+ content += "<%s>%s</%s>\n" % (k, v, k)
+ content += """</PlatformSettings></wa:PlatformSettingsSection>
+</Environment>"""
return content
@@ -84,11 +85,11 @@ class TestAzureDataSource(CiTestCase):
super(TestAzureDataSource, self).setUp()
if PY26:
raise SkipTest("Does not work on python 2.6")
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
+ self.tmp = self.tmp_dir()
# patch cloud_dir, so our 'seed_dir' is guaranteed empty
- self.paths = helpers.Paths({'cloud_dir': self.tmp})
+ self.paths = helpers.Paths(
+ {'cloud_dir': self.tmp, 'run_dir': self.tmp})
self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent')
self.patches = ExitStack()
@@ -176,6 +177,7 @@ scbus-1 on xpt0 bus 0
(dsaz, 'get_hostname', mock.MagicMock()),
(dsaz, 'set_hostname', mock.MagicMock()),
(dsaz, 'get_metadata_from_fabric', self.get_metadata_from_fabric),
+ (dsaz.util, 'which', lambda x: True),
(dsaz.util, 'read_dmi_data', mock.MagicMock(
side_effect=_dmi_mocks)),
(dsaz.util, 'wait_for_files', mock.MagicMock(
@@ -642,7 +644,9 @@ fdescfs /dev/fd fdescfs rw 0 0
self.assertEqual(netconfig, expected_config)
-class TestAzureBounce(TestCase):
+class TestAzureBounce(CiTestCase):
+
+ with_logs = True
def mock_out_azure_moving_parts(self):
self.patches.enter_context(
@@ -655,6 +659,8 @@ class TestAzureBounce(TestCase):
self.patches.enter_context(
mock.patch.object(dsaz, 'get_metadata_from_fabric',
mock.MagicMock(return_value={})))
+ self.patches.enter_context(
+ mock.patch.object(dsaz.util, 'which', lambda x: True))
def _dmi_mocks(key):
if key == 'system-uuid':
@@ -669,10 +675,10 @@ class TestAzureBounce(TestCase):
def setUp(self):
super(TestAzureBounce, self).setUp()
- self.tmp = tempfile.mkdtemp()
+ self.tmp = self.tmp_dir()
self.waagent_d = os.path.join(self.tmp, 'var', 'lib', 'waagent')
- self.paths = helpers.Paths({'cloud_dir': self.tmp})
- self.addCleanup(shutil.rmtree, self.tmp)
+ self.paths = helpers.Paths(
+ {'cloud_dir': self.tmp, 'run_dir': self.tmp})
dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
self.patches = ExitStack()
self.mock_out_azure_moving_parts()
@@ -714,21 +720,24 @@ class TestAzureBounce(TestCase):
def test_disabled_bounce_does_not_change_hostname(self):
cfg = {'hostname_bounce': {'policy': 'off'}}
- self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg)).get_data()
+ ds = self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg))
+ ds.get_data()
self.assertEqual(0, self.set_hostname.call_count)
@mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
def test_disabled_bounce_does_not_perform_bounce(
self, perform_hostname_bounce):
cfg = {'hostname_bounce': {'policy': 'off'}}
- self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg)).get_data()
+ ds = self._get_ds(self.get_ovf_env_with_dscfg('test-host', cfg))
+ ds.get_data()
self.assertEqual(0, perform_hostname_bounce.call_count)
def test_same_hostname_does_not_change_hostname(self):
host_name = 'unchanged-host-name'
self.get_hostname.return_value = host_name
cfg = {'hostname_bounce': {'policy': 'yes'}}
- self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data()
+ ds = self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg))
+ ds.get_data()
self.assertEqual(0, self.set_hostname.call_count)
@mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
@@ -737,7 +746,8 @@ class TestAzureBounce(TestCase):
host_name = 'unchanged-host-name'
self.get_hostname.return_value = host_name
cfg = {'hostname_bounce': {'policy': 'yes'}}
- self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg)).get_data()
+ ds = self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg))
+ ds.get_data()
self.assertEqual(0, perform_hostname_bounce.call_count)
@mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
@@ -751,6 +761,22 @@ class TestAzureBounce(TestCase):
self.assertTrue(ret)
self.assertEqual(1, perform_hostname_bounce.call_count)
+ def test_bounce_skipped_on_ifupdown_absent(self):
+ host_name = 'unchanged-host-name'
+ self.get_hostname.return_value = host_name
+ cfg = {'hostname_bounce': {'policy': 'force'}}
+ dsrc = self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg),
+ agent_command=['not', '__builtin__'])
+ patch_path = 'cloudinit.sources.DataSourceAzure.util.which'
+ with mock.patch(patch_path) as m_which:
+ m_which.return_value = None
+ ret = self._get_and_setup(dsrc)
+ self.assertEqual([mock.call('ifup')], m_which.call_args_list)
+ self.assertTrue(ret)
+ self.assertIn(
+ "Skipping network bounce: ifupdown utils aren't present.",
+ self.logs.getvalue())
+
def test_different_hostnames_sets_hostname(self):
expected_hostname = 'azure-expected-host-name'
self.get_hostname.return_value = 'default-host-name'
@@ -815,9 +841,7 @@ class TestAzureBounce(TestCase):
self.assertEqual(hostname, bounce_env['hostname'])
self.assertEqual(old_hostname, bounce_env['old_hostname'])
- def test_default_bounce_command_used_by_default(self):
- cmd = 'default-bounce-command'
- dsaz.BUILTIN_DS_CONFIG['hostname_bounce']['command'] = cmd
+ def test_default_bounce_command_ifup_used_by_default(self):
cfg = {'hostname_bounce': {'policy': 'force'}}
data = self.get_ovf_env_with_dscfg('some-hostname', cfg)
dsrc = self._get_ds(data, agent_command=['not', '__builtin__'])
@@ -825,7 +849,8 @@ class TestAzureBounce(TestCase):
self.assertTrue(ret)
self.assertEqual(1, self.subp.call_count)
bounce_args = self.subp.call_args[1]['args']
- self.assertEqual(cmd, bounce_args)
+ self.assertEqual(
+ dsaz.BOUNCE_COMMAND_IFUP, bounce_args)
@mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
def test_set_hostname_option_can_disable_bounce(
@@ -895,9 +920,6 @@ class TestCanDevBeReformatted(CiTestCase):
setattr(self, sattr, patcher.start())
self.addCleanup(patcher.stop)
- def setUp(self):
- super(TestCanDevBeReformatted, self).setUp()
-
def patchup(self, devs):
bypath = {}
for path, data in devs.items():
@@ -952,14 +974,14 @@ class TestCanDevBeReformatted(CiTestCase):
'/dev/sda3': {'num': 3},
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertFalse(False, value)
+ self.assertFalse(value)
self.assertIn("3 or more", msg.lower())
def test_no_partitions_is_false(self):
"""A disk with no partitions can not be formatted."""
self.patchup({'/dev/sda': {}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertEqual(False, value)
+ self.assertFalse(value)
self.assertIn("not partitioned", msg.lower())
def test_two_partitions_not_ntfs_false(self):
@@ -971,7 +993,7 @@ class TestCanDevBeReformatted(CiTestCase):
'/dev/sda2': {'num': 2, 'fs': 'ext4', 'files': []},
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertFalse(False, value)
+ self.assertFalse(value)
self.assertIn("not ntfs", msg.lower())
def test_two_partitions_ntfs_populated_false(self):
@@ -984,7 +1006,7 @@ class TestCanDevBeReformatted(CiTestCase):
'files': ['secret.txt']},
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertFalse(False, value)
+ self.assertFalse(value)
self.assertIn("files on it", msg.lower())
def test_two_partitions_ntfs_empty_is_true(self):
@@ -996,7 +1018,7 @@ class TestCanDevBeReformatted(CiTestCase):
'/dev/sda2': {'num': 2, 'fs': 'ntfs', 'files': []},
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertEqual(True, value)
+ self.assertTrue(value)
self.assertIn("safe for", msg.lower())
def test_one_partition_not_ntfs_false(self):
@@ -1007,7 +1029,7 @@ class TestCanDevBeReformatted(CiTestCase):
'/dev/sda1': {'num': 1, 'fs': 'zfs'},
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertEqual(False, value)
+ self.assertFalse(value)
self.assertIn("not ntfs", msg.lower())
def test_one_partition_ntfs_populated_false(self):
@@ -1019,7 +1041,7 @@ class TestCanDevBeReformatted(CiTestCase):
'files': ['file1.txt', 'file2.exe']},
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertEqual(False, value)
+ self.assertFalse(value)
self.assertIn("files on it", msg.lower())
def test_one_partition_ntfs_empty_is_true(self):
@@ -1030,7 +1052,7 @@ class TestCanDevBeReformatted(CiTestCase):
'/dev/sda1': {'num': 1, 'fs': 'ntfs', 'files': []}
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertEqual(True, value)
+ self.assertTrue(value)
self.assertIn("safe for", msg.lower())
def test_one_partition_ntfs_empty_with_dataloss_file_is_true(self):
@@ -1042,7 +1064,7 @@ class TestCanDevBeReformatted(CiTestCase):
'files': ['dataloss_warning_readme.txt']}
}}})
value, msg = dsaz.can_dev_be_reformatted("/dev/sda")
- self.assertEqual(True, value)
+ self.assertTrue(value)
self.assertIn("safe for", msg.lower())
def test_one_partition_through_realpath_is_true(self):
@@ -1057,7 +1079,7 @@ class TestCanDevBeReformatted(CiTestCase):
'realpath': '/dev/sdb1'}
}}})
value, msg = dsaz.can_dev_be_reformatted(epath)
- self.assertEqual(True, value)
+ self.assertTrue(value)
self.assertIn("safe for", msg.lower())
def test_three_partition_through_realpath_is_false(self):
@@ -1076,7 +1098,7 @@ class TestCanDevBeReformatted(CiTestCase):
'realpath': '/dev/sdb3'}
}}})
value, msg = dsaz.can_dev_be_reformatted(epath)
- self.assertEqual(False, value)
+ self.assertFalse(value)
self.assertIn("3 or more", msg.lower())
@@ -1088,4 +1110,146 @@ class TestAzureNetExists(CiTestCase):
self.assertTrue(hasattr(dsaz, "DataSourceAzureNet"))
+@mock.patch('cloudinit.sources.DataSourceAzure.util.subp')
+@mock.patch.object(dsaz, 'get_hostname')
+@mock.patch.object(dsaz, 'set_hostname')
+class TestAzureDataSourcePreprovisioning(CiTestCase):
+
+ def setUp(self):
+ super(TestAzureDataSourcePreprovisioning, self).setUp()
+ tmp = self.tmp_dir()
+ self.waagent_d = self.tmp_path('/var/lib/waagent', tmp)
+ self.paths = helpers.Paths({'cloud_dir': tmp})
+ dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
+
+ def test_read_azure_ovf_with_true_flag(self, *args):
+ """The read_azure_ovf method should set the PreprovisionedVM
+ cfg flag if the proper setting is present."""
+ content = construct_valid_ovf_env(
+ platform_settings={"PreprovisionedVm": "True"})
+ ret = dsaz.read_azure_ovf(content)
+ cfg = ret[2]
+ self.assertTrue(cfg['PreprovisionedVm'])
+
+ def test_read_azure_ovf_with_false_flag(self, *args):
+ """The read_azure_ovf method should set the PreprovisionedVM
+ cfg flag to false if the proper setting is false."""
+ content = construct_valid_ovf_env(
+ platform_settings={"PreprovisionedVm": "False"})
+ ret = dsaz.read_azure_ovf(content)
+ cfg = ret[2]
+ self.assertFalse(cfg['PreprovisionedVm'])
+
+ def test_read_azure_ovf_without_flag(self, *args):
+ """The read_azure_ovf method should not set the
+ PreprovisionedVM cfg flag."""
+ content = construct_valid_ovf_env()
+ ret = dsaz.read_azure_ovf(content)
+ cfg = ret[2]
+ self.assertFalse(cfg['PreprovisionedVm'])
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.util.is_FreeBSD')
+ @mock.patch('cloudinit.net.dhcp.EphemeralIPv4Network')
+ @mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery')
+ @mock.patch('requests.Session.request')
+ def test_poll_imds_returns_ovf_env(self, fake_resp, m_dhcp, m_net,
+ m_is_bsd, *args):
+ """The _poll_imds method should return the ovf_env.xml."""
+ m_is_bsd.return_value = False
+ m_dhcp.return_value = [{
+ 'interface': 'eth9', 'fixed-address': '192.168.2.9',
+ 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0'}]
+ url = 'http://{0}/metadata/reprovisiondata?api-version=2017-04-02'
+ host = "169.254.169.254"
+ full_url = url.format(host)
+ fake_resp.return_value = mock.MagicMock(status_code=200, text="ovf")
+ dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
+ self.assertTrue(len(dsa._poll_imds()) > 0)
+ self.assertEqual(fake_resp.call_args_list,
+ [mock.call(allow_redirects=True,
+ headers={'Metadata': 'true',
+ 'User-Agent':
+ 'Cloud-Init/%s' % vs()
+ }, method='GET', timeout=60.0,
+ url=full_url),
+ mock.call(allow_redirects=True,
+ headers={'Metadata': 'true',
+ 'User-Agent':
+ 'Cloud-Init/%s' % vs()
+ }, method='GET', url=full_url)])
+ self.assertEqual(m_dhcp.call_count, 1)
+ m_net.assert_any_call(
+ broadcast='192.168.2.255', interface='eth9', ip='192.168.2.9',
+ prefix_or_mask='255.255.255.0', router='192.168.2.1')
+ self.assertEqual(m_net.call_count, 1)
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.util.is_FreeBSD')
+ @mock.patch('cloudinit.net.dhcp.EphemeralIPv4Network')
+ @mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery')
+ @mock.patch('requests.Session.request')
+ def test__reprovision_calls__poll_imds(self, fake_resp, m_dhcp, m_net,
+ m_is_bsd, *args):
+ """The _reprovision method should call poll IMDS."""
+ m_is_bsd.return_value = False
+ m_dhcp.return_value = [{
+ 'interface': 'eth9', 'fixed-address': '192.168.2.9',
+ 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0',
+ 'unknown-245': '624c3620'}]
+ url = 'http://{0}/metadata/reprovisiondata?api-version=2017-04-02'
+ host = "169.254.169.254"
+ full_url = url.format(host)
+ hostname = "myhost"
+ username = "myuser"
+ odata = {'HostName': hostname, 'UserName': username}
+ content = construct_valid_ovf_env(data=odata)
+ fake_resp.return_value = mock.MagicMock(status_code=200, text=content)
+ dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
+ md, ud, cfg, d = dsa._reprovision()
+ self.assertEqual(md['local-hostname'], hostname)
+ self.assertEqual(cfg['system_info']['default_user']['name'], username)
+ self.assertEqual(fake_resp.call_args_list,
+ [mock.call(allow_redirects=True,
+ headers={'Metadata': 'true',
+ 'User-Agent':
+ 'Cloud-Init/%s' % vs()},
+ method='GET', timeout=60.0, url=full_url),
+ mock.call(allow_redirects=True,
+ headers={'Metadata': 'true',
+ 'User-Agent':
+ 'Cloud-Init/%s' % vs()},
+ method='GET', url=full_url)])
+ self.assertEqual(m_dhcp.call_count, 1)
+ m_net.assert_any_call(
+ broadcast='192.168.2.255', interface='eth9', ip='192.168.2.9',
+ prefix_or_mask='255.255.255.0', router='192.168.2.1')
+ self.assertEqual(m_net.call_count, 1)
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.util.write_file')
+ @mock.patch('os.path.isfile')
+ def test__should_reprovision_with_true_cfg(self, isfile, write_f, *args):
+ """The _should_reprovision method should return true with config
+ flag present."""
+ isfile.return_value = False
+ dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
+ self.assertTrue(dsa._should_reprovision(
+ (None, None, {'PreprovisionedVm': True}, None)))
+
+ @mock.patch('os.path.isfile')
+ def test__should_reprovision_with_file_existing(self, isfile, *args):
+ """The _should_reprovision method should return True if the sentinal
+ exists."""
+ isfile.return_value = True
+ dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
+ self.assertTrue(dsa._should_reprovision(
+ (None, None, {'preprovisionedvm': False}, None)))
+
+ @mock.patch('os.path.isfile')
+ def test__should_reprovision_returns_false(self, isfile, *args):
+ """The _should_reprovision method should return False
+ if config and sentinal are not present."""
+ isfile.return_value = False
+ dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
+ self.assertFalse(dsa._should_reprovision((None, None, {}, None)))
+
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py
index e4c59907..f6a59b6b 100644
--- a/tests/unittests/test_datasource/test_cloudsigma.py
+++ b/tests/unittests/test_datasource/test_cloudsigma.py
@@ -3,6 +3,7 @@
import copy
from cloudinit.cs_utils import Cepko
+from cloudinit import helpers
from cloudinit import sources
from cloudinit.sources import DataSourceCloudSigma
@@ -38,10 +39,12 @@ class CepkoMock(Cepko):
return self
-class DataSourceCloudSigmaTest(test_helpers.TestCase):
+class DataSourceCloudSigmaTest(test_helpers.CiTestCase):
def setUp(self):
super(DataSourceCloudSigmaTest, self).setUp()
- self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "")
+ self.paths = helpers.Paths({'run_dir': self.tmp_dir()})
+ self.datasource = DataSourceCloudSigma.DataSourceCloudSigma(
+ "", "", paths=self.paths)
self.datasource.is_running_in_cloudsigma = lambda: True
self.datasource.cepko = CepkoMock(SERVER_CONTEXT)
self.datasource.get_data()
@@ -85,7 +88,8 @@ class DataSourceCloudSigmaTest(test_helpers.TestCase):
def test_lack_of_vendor_data(self):
stripped_context = copy.deepcopy(SERVER_CONTEXT)
del stripped_context["vendor_data"]
- self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "")
+ self.datasource = DataSourceCloudSigma.DataSourceCloudSigma(
+ "", "", paths=self.paths)
self.datasource.cepko = CepkoMock(stripped_context)
self.datasource.get_data()
@@ -94,7 +98,8 @@ class DataSourceCloudSigmaTest(test_helpers.TestCase):
def test_lack_of_cloudinit_key_in_vendor_data(self):
stripped_context = copy.deepcopy(SERVER_CONTEXT)
del stripped_context["vendor_data"]["cloudinit"]
- self.datasource = DataSourceCloudSigma.DataSourceCloudSigma("", "", "")
+ self.datasource = DataSourceCloudSigma.DataSourceCloudSigma(
+ "", "", paths=self.paths)
self.datasource.cepko = CepkoMock(stripped_context)
self.datasource.get_data()
diff --git a/tests/unittests/test_datasource/test_cloudstack.py b/tests/unittests/test_datasource/test_cloudstack.py
index 96144b64..d6d2d6b2 100644
--- a/tests/unittests/test_datasource/test_cloudstack.py
+++ b/tests/unittests/test_datasource/test_cloudstack.py
@@ -33,6 +33,7 @@ class TestCloudStackPasswordFetching(CiTestCase):
self.patches.enter_context(mock.patch(
mod_name + '.dhcp.networkd_get_option_from_leases',
get_networkd_server_address))
+ self.tmp = self.tmp_dir()
def _set_password_server_response(self, response_string):
subp = mock.MagicMock(return_value=(response_string, ''))
@@ -43,26 +44,30 @@ class TestCloudStackPasswordFetching(CiTestCase):
def test_empty_password_doesnt_create_config(self):
self._set_password_server_response('')
- ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds = DataSourceCloudStack(
+ {}, None, helpers.Paths({'run_dir': self.tmp}))
ds.get_data()
self.assertEqual({}, ds.get_config_obj())
def test_saved_password_doesnt_create_config(self):
self._set_password_server_response('saved_password')
- ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds = DataSourceCloudStack(
+ {}, None, helpers.Paths({'run_dir': self.tmp}))
ds.get_data()
self.assertEqual({}, ds.get_config_obj())
def test_password_sets_password(self):
password = 'SekritSquirrel'
self._set_password_server_response(password)
- ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds = DataSourceCloudStack(
+ {}, None, helpers.Paths({'run_dir': self.tmp}))
ds.get_data()
self.assertEqual(password, ds.get_config_obj()['password'])
def test_bad_request_doesnt_stop_ds_from_working(self):
self._set_password_server_response('bad_request')
- ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds = DataSourceCloudStack(
+ {}, None, helpers.Paths({'run_dir': self.tmp}))
self.assertTrue(ds.get_data())
def assertRequestTypesSent(self, subp, expected_request_types):
@@ -77,14 +82,16 @@ class TestCloudStackPasswordFetching(CiTestCase):
def test_valid_response_means_password_marked_as_saved(self):
password = 'SekritSquirrel'
subp = self._set_password_server_response(password)
- ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds = DataSourceCloudStack(
+ {}, None, helpers.Paths({'run_dir': self.tmp}))
ds.get_data()
self.assertRequestTypesSent(subp,
['send_my_password', 'saved_password'])
def _check_password_not_saved_for(self, response_string):
subp = self._set_password_server_response(response_string)
- ds = DataSourceCloudStack({}, None, helpers.Paths({}))
+ ds = DataSourceCloudStack(
+ {}, None, helpers.Paths({'run_dir': self.tmp}))
ds.get_data()
self.assertRequestTypesSent(subp, ['send_my_password'])
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index 237c189b..68400f22 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -3,9 +3,6 @@
from copy import copy, deepcopy
import json
import os
-import shutil
-import six
-import tempfile
from cloudinit import helpers
from cloudinit.net import eni
@@ -15,7 +12,7 @@ from cloudinit.sources import DataSourceConfigDrive as ds
from cloudinit.sources.helpers import openstack
from cloudinit import util
-from cloudinit.tests.helpers import TestCase, ExitStack, mock
+from cloudinit.tests.helpers import CiTestCase, ExitStack, mock, populate_dir
PUBKEY = u'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460\n'
@@ -223,12 +220,11 @@ CFG_DRIVE_FILES_V2 = {
'openstack/2015-10-15/network_data.json': json.dumps(NETWORK_DATA)}
-class TestConfigDriveDataSource(TestCase):
+class TestConfigDriveDataSource(CiTestCase):
def setUp(self):
super(TestConfigDriveDataSource, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
+ self.tmp = self.tmp_dir()
def test_ec2_metadata(self):
populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
@@ -462,6 +458,12 @@ class TestConfigDriveDataSource(TestCase):
self.assertEqual(["/dev/vdb3"],
ds.find_candidate_devs())
+ # Verify that uppercase labels are also found.
+ devs_with_answers = {"TYPE=vfat": [],
+ "TYPE=iso9660": ["/dev/vdb"],
+ "LABEL=CONFIG-2": ["/dev/vdb"]}
+ self.assertEqual(["/dev/vdb"], ds.find_candidate_devs())
+
finally:
util.find_devs_with = orig_find_devs_with
util.is_partition = orig_is_partition
@@ -469,31 +471,27 @@ class TestConfigDriveDataSource(TestCase):
@mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
def test_pubkeys_v2(self, on_first_boot):
"""Verify that public-keys work in config-drive-v2."""
- populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
- myds = cfg_ds_from_dir(self.tmp)
+ myds = cfg_ds_from_dir(self.tmp, files=CFG_DRIVE_FILES_V2)
self.assertEqual(myds.get_public_ssh_keys(),
[OSTACK_META['public_keys']['mykey']])
-class TestNetJson(TestCase):
+class TestNetJson(CiTestCase):
def setUp(self):
super(TestNetJson, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
+ self.tmp = self.tmp_dir()
self.maxDiff = None
@mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
def test_network_data_is_found(self, on_first_boot):
"""Verify that network_data is present in ds in config-drive-v2."""
- populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
- myds = cfg_ds_from_dir(self.tmp)
+ myds = cfg_ds_from_dir(self.tmp, files=CFG_DRIVE_FILES_V2)
self.assertIsNotNone(myds.network_json)
@mock.patch('cloudinit.sources.DataSourceConfigDrive.on_first_boot')
def test_network_config_is_converted(self, on_first_boot):
"""Verify that network_data is converted and present on ds object."""
- populate_dir(self.tmp, CFG_DRIVE_FILES_V2)
- myds = cfg_ds_from_dir(self.tmp)
+ myds = cfg_ds_from_dir(self.tmp, files=CFG_DRIVE_FILES_V2)
network_config = openstack.convert_net_json(NETWORK_DATA,
known_macs=KNOWN_MACS)
self.assertEqual(myds.network_config, network_config)
@@ -598,11 +596,10 @@ class TestNetJson(TestCase):
self.assertEqual(out_data, conv_data)
-class TestConvertNetworkData(TestCase):
+class TestConvertNetworkData(CiTestCase):
def setUp(self):
super(TestConvertNetworkData, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
+ self.tmp = self.tmp_dir()
def _getnames_in_config(self, ncfg):
return set([n['name'] for n in ncfg['config']
@@ -724,14 +721,18 @@ class TestConvertNetworkData(TestCase):
self.assertEqual(expected, config_name2mac)
-def cfg_ds_from_dir(seed_d):
- cfg_ds = ds.DataSourceConfigDrive(settings.CFG_BUILTIN, None,
- helpers.Paths({}))
- cfg_ds.seed_dir = seed_d
+def cfg_ds_from_dir(base_d, files=None):
+ run = os.path.join(base_d, "run")
+ os.mkdir(run)
+ cfg_ds = ds.DataSourceConfigDrive(
+ settings.CFG_BUILTIN, None, helpers.Paths({'run_dir': run}))
+ cfg_ds.seed_dir = os.path.join(base_d, "seed")
+ if files:
+ populate_dir(cfg_ds.seed_dir, files)
cfg_ds.known_macs = KNOWN_MACS.copy()
if not cfg_ds.get_data():
raise RuntimeError("Data source did not extract itself from"
- " seed directory %s" % seed_d)
+ " seed directory %s" % cfg_ds.seed_dir)
return cfg_ds
@@ -749,17 +750,4 @@ def populate_ds_from_read_config(cfg_ds, source, results):
cfg_ds.network_json, known_macs=KNOWN_MACS)
-def populate_dir(seed_dir, files):
- for (name, content) in files.items():
- path = os.path.join(seed_dir, name)
- dirname = os.path.dirname(path)
- if not os.path.isdir(dirname):
- os.makedirs(dirname)
- if isinstance(content, six.text_type):
- mode = "w"
- else:
- mode = "wb"
- with open(path, mode) as fp:
- fp.write(content)
-
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_digitalocean.py b/tests/unittests/test_datasource/test_digitalocean.py
index f264f361..3127014b 100644
--- a/tests/unittests/test_datasource/test_digitalocean.py
+++ b/tests/unittests/test_datasource/test_digitalocean.py
@@ -13,7 +13,7 @@ from cloudinit import settings
from cloudinit.sources import DataSourceDigitalOcean
from cloudinit.sources.helpers import digitalocean
-from cloudinit.tests.helpers import mock, TestCase
+from cloudinit.tests.helpers import mock, CiTestCase
DO_MULTIPLE_KEYS = ["ssh-rsa AAAAB3NzaC1yc2EAAAA... test1@do.co",
"ssh-rsa AAAAB3NzaC1yc2EAAAA... test2@do.co"]
@@ -135,14 +135,17 @@ def _mock_dmi():
return (True, DO_META.get('id'))
-class TestDataSourceDigitalOcean(TestCase):
+class TestDataSourceDigitalOcean(CiTestCase):
"""
Test reading the meta-data
"""
+ def setUp(self):
+ super(TestDataSourceDigitalOcean, self).setUp()
+ self.tmp = self.tmp_dir()
def get_ds(self, get_sysinfo=_mock_dmi):
ds = DataSourceDigitalOcean.DataSourceDigitalOcean(
- settings.CFG_BUILTIN, None, helpers.Paths({}))
+ settings.CFG_BUILTIN, None, helpers.Paths({'run_dir': self.tmp}))
ds.use_ip4LL = False
if get_sysinfo is not None:
ds._get_sysinfo = get_sysinfo
@@ -194,11 +197,10 @@ class TestDataSourceDigitalOcean(TestCase):
self.assertIsInstance(ds.get_public_ssh_keys(), list)
-class TestNetworkConvert(TestCase):
+class TestNetworkConvert(CiTestCase):
- @mock.patch('cloudinit.net.get_interfaces_by_mac')
- def _get_networking(self, m_get_by_mac):
- m_get_by_mac.return_value = {
+ def _get_networking(self):
+ self.m_get_by_mac.return_value = {
'04:01:57:d1:9e:01': 'ens1',
'04:01:57:d1:9e:02': 'ens2',
'b8:ae:ed:75:5f:9a': 'enp0s25',
@@ -208,6 +210,10 @@ class TestNetworkConvert(TestCase):
self.assertIn('config', netcfg)
return netcfg
+ def setUp(self):
+ super(TestNetworkConvert, self).setUp()
+ self.add_patch('cloudinit.net.get_interfaces_by_mac', 'm_get_by_mac')
+
def test_networking_defined(self):
netcfg = self._get_networking()
self.assertIsNotNone(netcfg)
diff --git a/tests/unittests/test_datasource/test_ec2.py b/tests/unittests/test_datasource/test_ec2.py
index ba328ee9..0f7267bb 100644
--- a/tests/unittests/test_datasource/test_ec2.py
+++ b/tests/unittests/test_datasource/test_ec2.py
@@ -186,6 +186,7 @@ class TestEc2(test_helpers.HttprettyTestCase):
super(TestEc2, self).setUp()
self.datasource = ec2.DataSourceEc2
self.metadata_addr = self.datasource.metadata_urls[0]
+ self.tmp = self.tmp_dir()
def data_url(self, version):
"""Return a metadata url based on the version provided."""
@@ -199,7 +200,7 @@ class TestEc2(test_helpers.HttprettyTestCase):
def _setup_ds(self, sys_cfg, platform_data, md, md_version=None):
self.uris = []
distro = {}
- paths = helpers.Paths({})
+ paths = helpers.Paths({'run_dir': self.tmp})
if sys_cfg is None:
sys_cfg = {}
ds = self.datasource(sys_cfg=sys_cfg, distro=distro, paths=paths)
@@ -329,7 +330,8 @@ class TestEc2(test_helpers.HttprettyTestCase):
ds.fallback_nic = 'eth9'
with mock.patch(get_interface_mac_path) as m_get_interface_mac:
m_get_interface_mac.return_value = mac1
- ds.network_config # Will re-crawl network metadata
+ nc = ds.network_config # Will re-crawl network metadata
+ self.assertIsNotNone(nc)
self.assertIn('Re-crawl of metadata service', self.logs.getvalue())
expected = {'version': 1, 'config': [
{'mac_address': '06:17:04:d7:26:09',
@@ -423,7 +425,7 @@ class TestEc2(test_helpers.HttprettyTestCase):
self.logs.getvalue())
@httpretty.activate
- @mock.patch('cloudinit.net.EphemeralIPv4Network')
+ @mock.patch('cloudinit.net.dhcp.EphemeralIPv4Network')
@mock.patch('cloudinit.net.find_fallback_nic')
@mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery')
@mock.patch('cloudinit.sources.DataSourceEc2.util.is_FreeBSD')
diff --git a/tests/unittests/test_datasource/test_gce.py b/tests/unittests/test_datasource/test_gce.py
index d399ae7a..f77c2c40 100644
--- a/tests/unittests/test_datasource/test_gce.py
+++ b/tests/unittests/test_datasource/test_gce.py
@@ -4,13 +4,16 @@
#
# This file is part of cloud-init. See LICENSE file for license information.
+import datetime
import httpretty
+import json
import mock
import re
from base64 import b64encode, b64decode
from six.moves.urllib_parse import urlparse
+from cloudinit import distros
from cloudinit import helpers
from cloudinit import settings
from cloudinit.sources import DataSourceGCE
@@ -21,10 +24,7 @@ from cloudinit.tests import helpers as test_helpers
GCE_META = {
'instance/id': '123',
'instance/zone': 'foo/bar',
- 'project/attributes/sshKeys': 'user:ssh-rsa AA2..+aRD0fyVw== root@server',
'instance/hostname': 'server.project-foo.local',
- # UnicodeDecodeError below if set to ds.userdata instead of userdata_raw
- 'instance/attributes/user-data': b'/bin/echo \xff\n',
}
GCE_META_PARTIAL = {
@@ -37,11 +37,13 @@ GCE_META_ENCODING = {
'instance/id': '12345',
'instance/hostname': 'server.project-baz.local',
'instance/zone': 'baz/bang',
- 'instance/attributes/user-data': b64encode(b'/bin/echo baz\n'),
- 'instance/attributes/user-data-encoding': 'base64',
+ 'instance/attributes': {
+ 'user-data': b64encode(b'/bin/echo baz\n').decode('utf-8'),
+ 'user-data-encoding': 'base64',
+ }
}
-HEADERS = {'X-Google-Metadata-Request': 'True'}
+HEADERS = {'Metadata-Flavor': 'Google'}
MD_URL_RE = re.compile(
r'http://metadata.google.internal/computeMetadata/v1/.*')
@@ -54,10 +56,15 @@ def _set_mock_metadata(gce_meta=None):
url_path = urlparse(uri).path
if url_path.startswith('/computeMetadata/v1/'):
path = url_path.split('/computeMetadata/v1/')[1:][0]
+ recursive = path.endswith('/')
+ path = path.rstrip('/')
else:
path = None
if path in gce_meta:
- return (200, headers, gce_meta.get(path))
+ response = gce_meta.get(path)
+ if recursive:
+ response = json.dumps(response)
+ return (200, headers, response)
else:
return (404, headers, '')
@@ -69,10 +76,21 @@ def _set_mock_metadata(gce_meta=None):
@httpretty.activate
class TestDataSourceGCE(test_helpers.HttprettyTestCase):
+ def _make_distro(self, dtype, def_user=None):
+ cfg = dict(settings.CFG_BUILTIN)
+ cfg['system_info']['distro'] = dtype
+ paths = helpers.Paths(cfg['system_info']['paths'])
+ distro_cls = distros.fetch(dtype)
+ if def_user:
+ cfg['system_info']['default_user'] = def_user.copy()
+ distro = distro_cls(dtype, cfg['system_info'], paths)
+ return distro
+
def setUp(self):
+ tmp = self.tmp_dir()
self.ds = DataSourceGCE.DataSourceGCE(
settings.CFG_BUILTIN, None,
- helpers.Paths({}))
+ helpers.Paths({'run_dir': tmp}))
ppatch = self.m_platform_reports_gce = mock.patch(
'cloudinit.sources.DataSourceGCE.platform_reports_gce')
self.m_platform_reports_gce = ppatch.start()
@@ -89,6 +107,10 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
self.assertDictContainsSubset(HEADERS, req_header)
def test_metadata(self):
+ # UnicodeDecodeError if set to ds.userdata instead of userdata_raw
+ meta = GCE_META.copy()
+ meta['instance/attributes/user-data'] = b'/bin/echo \xff\n'
+
_set_mock_metadata()
self.ds.get_data()
@@ -117,8 +139,8 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
_set_mock_metadata(GCE_META_ENCODING)
self.ds.get_data()
- decoded = b64decode(
- GCE_META_ENCODING.get('instance/attributes/user-data'))
+ instance_data = GCE_META_ENCODING.get('instance/attributes')
+ decoded = b64decode(instance_data.get('user-data'))
self.assertEqual(decoded, self.ds.get_userdata_raw())
def test_missing_required_keys_return_false(self):
@@ -130,33 +152,124 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
self.assertEqual(False, self.ds.get_data())
httpretty.reset()
- def test_project_level_ssh_keys_are_used(self):
+ def test_no_ssh_keys_metadata(self):
_set_mock_metadata()
self.ds.get_data()
+ self.assertEqual([], self.ds.get_public_ssh_keys())
+
+ def test_cloudinit_ssh_keys(self):
+ valid_key = 'ssh-rsa VALID {0}'
+ invalid_key = 'ssh-rsa INVALID {0}'
+ project_attributes = {
+ 'sshKeys': '\n'.join([
+ 'cloudinit:{0}'.format(valid_key.format(0)),
+ 'user:{0}'.format(invalid_key.format(0)),
+ ]),
+ 'ssh-keys': '\n'.join([
+ 'cloudinit:{0}'.format(valid_key.format(1)),
+ 'user:{0}'.format(invalid_key.format(1)),
+ ]),
+ }
+ instance_attributes = {
+ 'ssh-keys': '\n'.join([
+ 'cloudinit:{0}'.format(valid_key.format(2)),
+ 'user:{0}'.format(invalid_key.format(2)),
+ ]),
+ 'block-project-ssh-keys': 'False',
+ }
+
+ meta = GCE_META.copy()
+ meta['project/attributes'] = project_attributes
+ meta['instance/attributes'] = instance_attributes
+
+ _set_mock_metadata(meta)
+ self.ds.get_data()
+
+ expected = [valid_key.format(key) for key in range(3)]
+ self.assertEqual(set(expected), set(self.ds.get_public_ssh_keys()))
+
+ @mock.patch("cloudinit.sources.DataSourceGCE.ug_util")
+ def test_default_user_ssh_keys(self, mock_ug_util):
+ mock_ug_util.normalize_users_groups.return_value = None, None
+ mock_ug_util.extract_default.return_value = 'ubuntu', None
+ ubuntu_ds = DataSourceGCE.DataSourceGCE(
+ settings.CFG_BUILTIN, self._make_distro('ubuntu'),
+ helpers.Paths({'run_dir': self.tmp_dir()}))
+
+ valid_key = 'ssh-rsa VALID {0}'
+ invalid_key = 'ssh-rsa INVALID {0}'
+ project_attributes = {
+ 'sshKeys': '\n'.join([
+ 'ubuntu:{0}'.format(valid_key.format(0)),
+ 'user:{0}'.format(invalid_key.format(0)),
+ ]),
+ 'ssh-keys': '\n'.join([
+ 'ubuntu:{0}'.format(valid_key.format(1)),
+ 'user:{0}'.format(invalid_key.format(1)),
+ ]),
+ }
+ instance_attributes = {
+ 'ssh-keys': '\n'.join([
+ 'ubuntu:{0}'.format(valid_key.format(2)),
+ 'user:{0}'.format(invalid_key.format(2)),
+ ]),
+ 'block-project-ssh-keys': 'False',
+ }
- # we expect a list of public ssh keys with user names stripped
- self.assertEqual(['ssh-rsa AA2..+aRD0fyVw== root@server'],
- self.ds.get_public_ssh_keys())
+ meta = GCE_META.copy()
+ meta['project/attributes'] = project_attributes
+ meta['instance/attributes'] = instance_attributes
+
+ _set_mock_metadata(meta)
+ ubuntu_ds.get_data()
+
+ expected = [valid_key.format(key) for key in range(3)]
+ self.assertEqual(set(expected), set(ubuntu_ds.get_public_ssh_keys()))
+
+ def test_instance_ssh_keys_override(self):
+ valid_key = 'ssh-rsa VALID {0}'
+ invalid_key = 'ssh-rsa INVALID {0}'
+ project_attributes = {
+ 'sshKeys': 'cloudinit:{0}'.format(invalid_key.format(0)),
+ 'ssh-keys': 'cloudinit:{0}'.format(invalid_key.format(1)),
+ }
+ instance_attributes = {
+ 'sshKeys': 'cloudinit:{0}'.format(valid_key.format(0)),
+ 'ssh-keys': 'cloudinit:{0}'.format(valid_key.format(1)),
+ 'block-project-ssh-keys': 'False',
+ }
- def test_instance_level_ssh_keys_are_used(self):
- key_content = 'ssh-rsa JustAUser root@server'
meta = GCE_META.copy()
- meta['instance/attributes/ssh-keys'] = 'user:{0}'.format(key_content)
+ meta['project/attributes'] = project_attributes
+ meta['instance/attributes'] = instance_attributes
_set_mock_metadata(meta)
self.ds.get_data()
- self.assertIn(key_content, self.ds.get_public_ssh_keys())
+ expected = [valid_key.format(key) for key in range(2)]
+ self.assertEqual(set(expected), set(self.ds.get_public_ssh_keys()))
+
+ def test_block_project_ssh_keys_override(self):
+ valid_key = 'ssh-rsa VALID {0}'
+ invalid_key = 'ssh-rsa INVALID {0}'
+ project_attributes = {
+ 'sshKeys': 'cloudinit:{0}'.format(invalid_key.format(0)),
+ 'ssh-keys': 'cloudinit:{0}'.format(invalid_key.format(1)),
+ }
+ instance_attributes = {
+ 'ssh-keys': 'cloudinit:{0}'.format(valid_key.format(0)),
+ 'block-project-ssh-keys': 'True',
+ }
- def test_instance_level_keys_replace_project_level_keys(self):
- key_content = 'ssh-rsa JustAUser root@server'
meta = GCE_META.copy()
- meta['instance/attributes/ssh-keys'] = 'user:{0}'.format(key_content)
+ meta['project/attributes'] = project_attributes
+ meta['instance/attributes'] = instance_attributes
_set_mock_metadata(meta)
self.ds.get_data()
- self.assertEqual([key_content], self.ds.get_public_ssh_keys())
+ expected = [valid_key.format(0)]
+ self.assertEqual(set(expected), set(self.ds.get_public_ssh_keys()))
def test_only_last_part_of_zone_used_for_availability_zone(self):
_set_mock_metadata()
@@ -171,5 +284,44 @@ class TestDataSourceGCE(test_helpers.HttprettyTestCase):
self.assertEqual(False, ret)
m_fetcher.assert_not_called()
+ def test_has_expired(self):
+
+ def _get_timestamp(days):
+ format_str = '%Y-%m-%dT%H:%M:%S+0000'
+ today = datetime.datetime.now()
+ timestamp = today + datetime.timedelta(days=days)
+ return timestamp.strftime(format_str)
+
+ past = _get_timestamp(-1)
+ future = _get_timestamp(1)
+ ssh_keys = {
+ None: False,
+ '': False,
+ 'Invalid': False,
+ 'user:ssh-rsa key user@domain.com': False,
+ 'user:ssh-rsa key google {"expireOn":"%s"}' % past: False,
+ 'user:ssh-rsa key google-ssh': False,
+ 'user:ssh-rsa key google-ssh {invalid:json}': False,
+ 'user:ssh-rsa key google-ssh {"userName":"user"}': False,
+ 'user:ssh-rsa key google-ssh {"expireOn":"invalid"}': False,
+ 'user:xyz key google-ssh {"expireOn":"%s"}' % future: False,
+ 'user:xyz key google-ssh {"expireOn":"%s"}' % past: True,
+ }
+
+ for key, expired in ssh_keys.items():
+ self.assertEqual(DataSourceGCE._has_expired(key), expired)
+
+ def test_parse_public_keys_non_ascii(self):
+ public_key_data = [
+ 'cloudinit:rsa ssh-ke%s invalid' % chr(165),
+ 'use%sname:rsa ssh-key' % chr(174),
+ 'cloudinit:test 1',
+ 'default:test 2',
+ 'user:test 3',
+ ]
+ expected = ['test 1', 'test 2']
+ found = DataSourceGCE._parse_public_keys(
+ public_key_data, default_user='default')
+ self.assertEqual(sorted(found), sorted(expected))
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_maas.py b/tests/unittests/test_datasource/test_maas.py
index 289c6a40..6e4031cf 100644
--- a/tests/unittests/test_datasource/test_maas.py
+++ b/tests/unittests/test_datasource/test_maas.py
@@ -1,6 +1,7 @@
# This file is part of cloud-init. See LICENSE file for license information.
from copy import copy
+import mock
import os
import shutil
import tempfile
@@ -8,15 +9,10 @@ import yaml
from cloudinit.sources import DataSourceMAAS
from cloudinit import url_helper
-from cloudinit.tests.helpers import TestCase, populate_dir
+from cloudinit.tests.helpers import CiTestCase, populate_dir
-try:
- from unittest import mock
-except ImportError:
- import mock
-
-class TestMAASDataSource(TestCase):
+class TestMAASDataSource(CiTestCase):
def setUp(self):
super(TestMAASDataSource, self).setUp()
@@ -159,4 +155,47 @@ class TestMAASDataSource(TestCase):
self.assertEqual(valid['meta-data/instance-id'], md['instance-id'])
self.assertEqual(expected_vd, vd)
+
+@mock.patch("cloudinit.sources.DataSourceMAAS.url_helper.OauthUrlHelper")
+class TestGetOauthHelper(CiTestCase):
+ with_logs = True
+ base_cfg = {'consumer_key': 'FAKE_CONSUMER_KEY',
+ 'token_key': 'FAKE_TOKEN_KEY',
+ 'token_secret': 'FAKE_TOKEN_SECRET',
+ 'consumer_secret': None}
+
+ def test_all_required(self, m_helper):
+ """Valid config as expected."""
+ DataSourceMAAS.get_oauth_helper(self.base_cfg.copy())
+ m_helper.assert_has_calls([mock.call(**self.base_cfg)])
+
+ def test_other_fields_not_passed_through(self, m_helper):
+ """Only relevant fields are passed through."""
+ mycfg = self.base_cfg.copy()
+ mycfg['unrelated_field'] = 'unrelated'
+ DataSourceMAAS.get_oauth_helper(mycfg)
+ m_helper.assert_has_calls([mock.call(**self.base_cfg)])
+
+
+class TestGetIdHash(CiTestCase):
+ v1_cfg = {'consumer_key': 'CKEY', 'token_key': 'TKEY',
+ 'token_secret': 'TSEC'}
+ v1_id = (
+ 'v1:'
+ '403ee5f19c956507f1d0e50814119c405902137ea4f8838bde167c5da8110392')
+
+ def test_v1_expected(self):
+ """Test v1 id generated as expected working behavior from config."""
+ result = DataSourceMAAS.get_id_from_ds_cfg(self.v1_cfg.copy())
+ self.assertEqual(self.v1_id, result)
+
+ def test_v1_extra_fields_are_ignored(self):
+ """Test v1 id ignores unused entries in config."""
+ cfg = self.v1_cfg.copy()
+ cfg['consumer_secret'] = "BOO"
+ cfg['unrelated'] = "HI MOM"
+ result = DataSourceMAAS.get_id_from_ds_cfg(cfg)
+ self.assertEqual(self.v1_id, result)
+
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index fea9156b..70d50de4 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -3,22 +3,20 @@
from cloudinit import helpers
from cloudinit.sources import DataSourceNoCloud
from cloudinit import util
-from cloudinit.tests.helpers import TestCase, populate_dir, mock, ExitStack
+from cloudinit.tests.helpers import CiTestCase, populate_dir, mock, ExitStack
import os
-import shutil
-import tempfile
import textwrap
import yaml
-class TestNoCloudDataSource(TestCase):
+class TestNoCloudDataSource(CiTestCase):
def setUp(self):
super(TestNoCloudDataSource, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
- self.paths = helpers.Paths({'cloud_dir': self.tmp})
+ self.tmp = self.tmp_dir()
+ self.paths = helpers.Paths(
+ {'cloud_dir': self.tmp, 'run_dir': self.tmp})
self.cmdline = "root=TESTCMDLINE"
@@ -215,7 +213,7 @@ class TestNoCloudDataSource(TestCase):
self.assertNotIn(gateway, str(dsrc.network_config))
-class TestParseCommandLineData(TestCase):
+class TestParseCommandLineData(CiTestCase):
def test_parse_cmdline_data_valid(self):
ds_id = "ds=nocloud"
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index e7d55692..5c3ba012 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -3,12 +3,11 @@
from cloudinit import helpers
from cloudinit.sources import DataSourceOpenNebula as ds
from cloudinit import util
-from cloudinit.tests.helpers import mock, populate_dir, TestCase
+from cloudinit.tests.helpers import mock, populate_dir, CiTestCase
+from textwrap import dedent
import os
import pwd
-import shutil
-import tempfile
import unittest
@@ -32,18 +31,20 @@ USER_DATA = '#cloud-config\napt_upgrade: true'
SSH_KEY = 'ssh-rsa AAAAB3NzaC1....sIkJhq8wdX+4I3A4cYbYP ubuntu@server-460-%i'
HOSTNAME = 'foo.example.com'
PUBLIC_IP = '10.0.0.3'
+MACADDR = '02:00:0a:12:01:01'
+IP_BY_MACADDR = '10.18.1.1'
DS_PATH = "cloudinit.sources.DataSourceOpenNebula"
-class TestOpenNebulaDataSource(TestCase):
+class TestOpenNebulaDataSource(CiTestCase):
parsed_user = None
def setUp(self):
super(TestOpenNebulaDataSource, self).setUp()
- self.tmp = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.tmp)
- self.paths = helpers.Paths({'cloud_dir': self.tmp})
+ self.tmp = self.tmp_dir()
+ self.paths = helpers.Paths(
+ {'cloud_dir': self.tmp, 'run_dir': self.tmp})
# defaults for few tests
self.ds = ds.DataSourceOpenNebula
@@ -197,24 +198,96 @@ class TestOpenNebulaDataSource(TestCase):
@mock.patch(DS_PATH + ".get_physical_nics_by_mac")
def test_hostname(self, m_get_phys_by_mac):
- m_get_phys_by_mac.return_value = {'02:00:0a:12:01:01': 'eth0'}
- for k in ('HOSTNAME', 'PUBLIC_IP', 'IP_PUBLIC', 'ETH0_IP'):
- my_d = os.path.join(self.tmp, k)
- populate_context_dir(my_d, {k: PUBLIC_IP})
- results = ds.read_context_disk_dir(my_d)
+ for dev in ('eth0', 'ens3'):
+ m_get_phys_by_mac.return_value = {MACADDR: dev}
+ for k in ('HOSTNAME', 'PUBLIC_IP', 'IP_PUBLIC', 'ETH0_IP'):
+ my_d = os.path.join(self.tmp, k)
+ populate_context_dir(my_d, {k: PUBLIC_IP})
+ results = ds.read_context_disk_dir(my_d)
- self.assertTrue('metadata' in results)
- self.assertTrue('local-hostname' in results['metadata'])
- self.assertEqual(PUBLIC_IP, results['metadata']['local-hostname'])
+ self.assertTrue('metadata' in results)
+ self.assertTrue('local-hostname' in results['metadata'])
+ self.assertEqual(
+ PUBLIC_IP, results['metadata']['local-hostname'])
@mock.patch(DS_PATH + ".get_physical_nics_by_mac")
def test_network_interfaces(self, m_get_phys_by_mac):
- m_get_phys_by_mac.return_value = {'02:00:0a:12:01:01': 'eth0'}
- populate_context_dir(self.seed_dir, {'ETH0_IP': '1.2.3.4'})
- results = ds.read_context_disk_dir(self.seed_dir)
-
- self.assertTrue('network-interfaces' in results)
- self.assertTrue('1.2.3.4' in results['network-interfaces'])
+ for dev in ('eth0', 'ens3'):
+ m_get_phys_by_mac.return_value = {MACADDR: dev}
+
+ # without ETH0_MAC
+ # for Older OpenNebula?
+ populate_context_dir(self.seed_dir, {'ETH0_IP': IP_BY_MACADDR})
+ results = ds.read_context_disk_dir(self.seed_dir)
+
+ self.assertTrue('network-interfaces' in results)
+ self.assertTrue(IP_BY_MACADDR in results['network-interfaces'])
+
+ # ETH0_IP and ETH0_MAC
+ populate_context_dir(
+ self.seed_dir, {'ETH0_IP': IP_BY_MACADDR, 'ETH0_MAC': MACADDR})
+ results = ds.read_context_disk_dir(self.seed_dir)
+
+ self.assertTrue('network-interfaces' in results)
+ self.assertTrue(IP_BY_MACADDR in results['network-interfaces'])
+
+ # ETH0_IP with empty string and ETH0_MAC
+ # in the case of using Virtual Network contains
+ # "AR = [ TYPE = ETHER ]"
+ populate_context_dir(
+ self.seed_dir, {'ETH0_IP': '', 'ETH0_MAC': MACADDR})
+ results = ds.read_context_disk_dir(self.seed_dir)
+
+ self.assertTrue('network-interfaces' in results)
+ self.assertTrue(IP_BY_MACADDR in results['network-interfaces'])
+
+ # ETH0_NETWORK
+ populate_context_dir(
+ self.seed_dir, {
+ 'ETH0_IP': IP_BY_MACADDR,
+ 'ETH0_MAC': MACADDR,
+ 'ETH0_NETWORK': '10.18.0.0'
+ })
+ results = ds.read_context_disk_dir(self.seed_dir)
+
+ self.assertTrue('network-interfaces' in results)
+ self.assertTrue('10.18.0.0' in results['network-interfaces'])
+
+ # ETH0_NETWORK with empty string
+ populate_context_dir(
+ self.seed_dir, {
+ 'ETH0_IP': IP_BY_MACADDR,
+ 'ETH0_MAC': MACADDR,
+ 'ETH0_NETWORK': ''
+ })
+ results = ds.read_context_disk_dir(self.seed_dir)
+
+ self.assertTrue('network-interfaces' in results)
+ self.assertTrue('10.18.1.0' in results['network-interfaces'])
+
+ # ETH0_MASK
+ populate_context_dir(
+ self.seed_dir, {
+ 'ETH0_IP': IP_BY_MACADDR,
+ 'ETH0_MAC': MACADDR,
+ 'ETH0_MASK': '255.255.0.0'
+ })
+ results = ds.read_context_disk_dir(self.seed_dir)
+
+ self.assertTrue('network-interfaces' in results)
+ self.assertTrue('255.255.0.0' in results['network-interfaces'])
+
+ # ETH0_MASK with empty string
+ populate_context_dir(
+ self.seed_dir, {
+ 'ETH0_IP': IP_BY_MACADDR,
+ 'ETH0_MAC': MACADDR,
+ 'ETH0_MASK': ''
+ })
+ results = ds.read_context_disk_dir(self.seed_dir)
+
+ self.assertTrue('network-interfaces' in results)
+ self.assertTrue('255.255.255.0' in results['network-interfaces'])
def test_find_candidates(self):
def my_devs_with(criteria):
@@ -235,7 +308,7 @@ class TestOpenNebulaDataSource(TestCase):
class TestOpenNebulaNetwork(unittest.TestCase):
- system_nics = {'02:00:0a:12:01:01': 'eth0'}
+ system_nics = ('eth0', 'ens3')
def test_lo(self):
net = ds.OpenNebulaNetwork(context={}, system_nics_by_mac={})
@@ -246,45 +319,101 @@ iface lo inet loopback
@mock.patch(DS_PATH + ".get_physical_nics_by_mac")
def test_eth0(self, m_get_phys_by_mac):
- m_get_phys_by_mac.return_value = self.system_nics
- net = ds.OpenNebulaNetwork({})
- self.assertEqual(net.gen_conf(), u'''\
-auto lo
-iface lo inet loopback
-
-auto eth0
-iface eth0 inet static
- address 10.18.1.1
- network 10.18.1.0
- netmask 255.255.255.0
-''')
+ for nic in self.system_nics:
+ m_get_phys_by_mac.return_value = {MACADDR: nic}
+ net = ds.OpenNebulaNetwork({})
+ self.assertEqual(net.gen_conf(), dedent("""\
+ auto lo
+ iface lo inet loopback
+
+ auto {dev}
+ iface {dev} inet static
+ #hwaddress {macaddr}
+ address 10.18.1.1
+ network 10.18.1.0
+ netmask 255.255.255.0
+ """.format(dev=nic, macaddr=MACADDR)))
def test_eth0_override(self):
context = {
'DNS': '1.2.3.8',
- 'ETH0_IP': '1.2.3.4',
- 'ETH0_NETWORK': '1.2.3.0',
+ 'ETH0_IP': '10.18.1.1',
+ 'ETH0_NETWORK': '10.18.0.0',
'ETH0_MASK': '255.255.0.0',
'ETH0_GATEWAY': '1.2.3.5',
'ETH0_DOMAIN': 'example.com',
- 'ETH0_DNS': '1.2.3.6 1.2.3.7'
+ 'ETH0_DNS': '1.2.3.6 1.2.3.7',
+ 'ETH0_MAC': '02:00:0a:12:01:01'
}
-
- net = ds.OpenNebulaNetwork(context,
- system_nics_by_mac=self.system_nics)
- self.assertEqual(net.gen_conf(), u'''\
-auto lo
-iface lo inet loopback
-
-auto eth0
-iface eth0 inet static
- address 1.2.3.4
- network 1.2.3.0
- netmask 255.255.0.0
- gateway 1.2.3.5
- dns-search example.com
- dns-nameservers 1.2.3.8 1.2.3.6 1.2.3.7
-''')
+ for nic in self.system_nics:
+ expected = dedent("""\
+ auto lo
+ iface lo inet loopback
+
+ auto {dev}
+ iface {dev} inet static
+ #hwaddress {macaddr}
+ address 10.18.1.1
+ network 10.18.0.0
+ netmask 255.255.0.0
+ gateway 1.2.3.5
+ dns-search example.com
+ dns-nameservers 1.2.3.8 1.2.3.6 1.2.3.7
+ """).format(dev=nic, macaddr=MACADDR)
+ net = ds.OpenNebulaNetwork(context,
+ system_nics_by_mac={MACADDR: nic})
+ self.assertEqual(expected, net.gen_conf())
+
+ def test_multiple_nics(self):
+ """Test rendering multiple nics with names that differ from context."""
+ MAC_1 = "02:00:0a:12:01:01"
+ MAC_2 = "02:00:0a:12:01:02"
+ context = {
+ 'DNS': '1.2.3.8',
+ 'ETH0_IP': '10.18.1.1',
+ 'ETH0_NETWORK': '10.18.0.0',
+ 'ETH0_MASK': '255.255.0.0',
+ 'ETH0_GATEWAY': '1.2.3.5',
+ 'ETH0_DOMAIN': 'example.com',
+ 'ETH0_DNS': '1.2.3.6 1.2.3.7',
+ 'ETH0_MAC': MAC_2,
+ 'ETH3_IP': '10.3.1.3',
+ 'ETH3_NETWORK': '10.3.0.0',
+ 'ETH3_MASK': '255.255.0.0',
+ 'ETH3_GATEWAY': '10.3.0.1',
+ 'ETH3_DOMAIN': 'third.example.com',
+ 'ETH3_DNS': '10.3.1.2',
+ 'ETH3_MAC': MAC_1,
+ }
+ net = ds.OpenNebulaNetwork(
+ context, system_nics_by_mac={MAC_1: 'enp0s25', MAC_2: 'enp1s2'})
+
+ expected = dedent("""\
+ auto lo
+ iface lo inet loopback
+
+ auto enp0s25
+ iface enp0s25 inet static
+ #hwaddress 02:00:0a:12:01:01
+ address 10.3.1.3
+ network 10.3.0.0
+ netmask 255.255.0.0
+ gateway 10.3.0.1
+ dns-search third.example.com
+ dns-nameservers 1.2.3.8 10.3.1.2
+
+ auto enp1s2
+ iface enp1s2 inet static
+ #hwaddress 02:00:0a:12:01:02
+ address 10.18.1.1
+ network 10.18.0.0
+ netmask 255.255.0.0
+ gateway 1.2.3.5
+ dns-search example.com
+ dns-nameservers 1.2.3.8 1.2.3.6 1.2.3.7
+ """)
+
+ self.assertEqual(expected, net.gen_conf())
class TestParseShellConfig(unittest.TestCase):
diff --git a/tests/unittests/test_datasource/test_openstack.py b/tests/unittests/test_datasource/test_openstack.py
index ed367e05..42c31554 100644
--- a/tests/unittests/test_datasource/test_openstack.py
+++ b/tests/unittests/test_datasource/test_openstack.py
@@ -131,6 +131,10 @@ def _read_metadata_service():
class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
VERSION = 'latest'
+ def setUp(self):
+ super(TestOpenStackDataSource, self).setUp()
+ self.tmp = self.tmp_dir()
+
@hp.activate
def test_successful(self):
_register_uris(self.VERSION, EC2_FILES, EC2_META, OS_FILES)
@@ -232,7 +236,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
_register_uris(self.VERSION, EC2_FILES, EC2_META, OS_FILES)
ds_os = ds.DataSourceOpenStack(settings.CFG_BUILTIN,
None,
- helpers.Paths({}))
+ helpers.Paths({'run_dir': self.tmp}))
self.assertIsNone(ds_os.version)
found = ds_os.get_data()
self.assertTrue(found)
@@ -256,7 +260,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
_register_uris(self.VERSION, {}, {}, os_files)
ds_os = ds.DataSourceOpenStack(settings.CFG_BUILTIN,
None,
- helpers.Paths({}))
+ helpers.Paths({'run_dir': self.tmp}))
self.assertIsNone(ds_os.version)
found = ds_os.get_data()
self.assertFalse(found)
@@ -271,7 +275,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
_register_uris(self.VERSION, {}, {}, os_files)
ds_os = ds.DataSourceOpenStack(settings.CFG_BUILTIN,
None,
- helpers.Paths({}))
+ helpers.Paths({'run_dir': self.tmp}))
ds_os.ds_cfg = {
'max_wait': 0,
'timeout': 0,
@@ -294,7 +298,7 @@ class TestOpenStackDataSource(test_helpers.HttprettyTestCase):
_register_uris(self.VERSION, {}, {}, os_files)
ds_os = ds.DataSourceOpenStack(settings.CFG_BUILTIN,
None,
- helpers.Paths({}))
+ helpers.Paths({'run_dir': self.tmp}))
ds_os.ds_cfg = {
'max_wait': 0,
'timeout': 0,
diff --git a/tests/unittests/test_datasource/test_ovf.py b/tests/unittests/test_datasource/test_ovf.py
index 700da86c..fc4eb36e 100644
--- a/tests/unittests/test_datasource/test_ovf.py
+++ b/tests/unittests/test_datasource/test_ovf.py
@@ -5,11 +5,17 @@
# This file is part of cloud-init. See LICENSE file for license information.
import base64
-from collections import OrderedDict
+import os
-from cloudinit.tests import helpers as test_helpers
+from collections import OrderedDict
+from textwrap import dedent
+from cloudinit import util
+from cloudinit.tests.helpers import CiTestCase, wrap_and_call
+from cloudinit.helpers import Paths
from cloudinit.sources import DataSourceOVF as dsovf
+from cloudinit.sources.helpers.vmware.imc.config_custom_script import (
+ CustomScriptNotFound)
OVF_ENV_CONTENT = """<?xml version="1.0" encoding="UTF-8"?>
<Environment xmlns="http://schemas.dmtf.org/ovf/environment/1"
@@ -42,7 +48,7 @@ def fill_properties(props, template=OVF_ENV_CONTENT):
return template.format(properties=properties)
-class TestReadOvfEnv(test_helpers.TestCase):
+class TestReadOvfEnv(CiTestCase):
def test_with_b64_userdata(self):
user_data = "#!/bin/sh\necho hello world\n"
user_data_b64 = base64.b64encode(user_data.encode()).decode()
@@ -72,7 +78,104 @@ class TestReadOvfEnv(test_helpers.TestCase):
self.assertIsNone(ud)
-class TestTransportIso9660(test_helpers.CiTestCase):
+class TestMarkerFiles(CiTestCase):
+
+ def setUp(self):
+ super(TestMarkerFiles, self).setUp()
+ self.tdir = self.tmp_dir()
+
+ def test_false_when_markerid_none(self):
+ """Return False when markerid provided is None."""
+ self.assertFalse(
+ dsovf.check_marker_exists(markerid=None, marker_dir=self.tdir))
+
+ def test_markerid_file_exist(self):
+ """Return False when markerid file path does not exist,
+ True otherwise."""
+ self.assertFalse(
+ dsovf.check_marker_exists('123', self.tdir))
+
+ marker_file = self.tmp_path('.markerfile-123.txt', self.tdir)
+ util.write_file(marker_file, '')
+ self.assertTrue(
+ dsovf.check_marker_exists('123', self.tdir)
+ )
+
+ def test_marker_file_setup(self):
+ """Test creation of marker files."""
+ markerfilepath = self.tmp_path('.markerfile-hi.txt', self.tdir)
+ self.assertFalse(os.path.exists(markerfilepath))
+ dsovf.setup_marker_files(markerid='hi', marker_dir=self.tdir)
+ self.assertTrue(os.path.exists(markerfilepath))
+
+
+class TestDatasourceOVF(CiTestCase):
+
+ with_logs = True
+
+ def setUp(self):
+ super(TestDatasourceOVF, self).setUp()
+ self.datasource = dsovf.DataSourceOVF
+ self.tdir = self.tmp_dir()
+
+ def test_get_data_false_on_none_dmi_data(self):
+ """When dmi for system-product-name is None, get_data returns False."""
+ paths = Paths({'seed_dir': self.tdir})
+ ds = self.datasource(sys_cfg={}, distro={}, paths=paths)
+ retcode = wrap_and_call(
+ 'cloudinit.sources.DataSourceOVF',
+ {'util.read_dmi_data': None},
+ ds.get_data)
+ self.assertFalse(retcode, 'Expected False return from ds.get_data')
+ self.assertIn(
+ 'DEBUG: No system-product-name found', self.logs.getvalue())
+
+ def test_get_data_no_vmware_customization_disabled(self):
+ """When vmware customization is disabled via sys_cfg log a message."""
+ paths = Paths({'seed_dir': self.tdir})
+ ds = self.datasource(
+ sys_cfg={'disable_vmware_customization': True}, distro={},
+ paths=paths)
+ retcode = wrap_and_call(
+ 'cloudinit.sources.DataSourceOVF',
+ {'util.read_dmi_data': 'vmware'},
+ ds.get_data)
+ self.assertFalse(retcode, 'Expected False return from ds.get_data')
+ self.assertIn(
+ 'DEBUG: Customization for VMware platform is disabled.',
+ self.logs.getvalue())
+
+ def test_get_data_vmware_customization_disabled(self):
+ """When cloud-init workflow for vmware is enabled via sys_cfg log a
+ message.
+ """
+ paths = Paths({'seed_dir': self.tdir})
+ ds = self.datasource(
+ sys_cfg={'disable_vmware_customization': False}, distro={},
+ paths=paths)
+ conf_file = self.tmp_path('test-cust', self.tdir)
+ conf_content = dedent("""\
+ [CUSTOM-SCRIPT]
+ SCRIPT-NAME = test-script
+ [MISC]
+ MARKER-ID = 12345345
+ """)
+ util.write_file(conf_file, conf_content)
+ with self.assertRaises(CustomScriptNotFound) as context:
+ wrap_and_call(
+ 'cloudinit.sources.DataSourceOVF',
+ {'util.read_dmi_data': 'vmware',
+ 'util.del_dir': True,
+ 'search_file': self.tdir,
+ 'wait_for_imc_cfg_file': conf_file,
+ 'get_nics_to_enable': ''},
+ ds.get_data)
+ customscript = self.tmp_path('test-script', self.tdir)
+ self.assertIn('Script %s not found!!' % customscript,
+ str(context.exception))
+
+
+class TestTransportIso9660(CiTestCase):
def setUp(self):
super(TestTransportIso9660, self).setUp()
diff --git a/tests/unittests/test_datasource/test_scaleway.py b/tests/unittests/test_datasource/test_scaleway.py
index 436df9ee..8dec06b1 100644
--- a/tests/unittests/test_datasource/test_scaleway.py
+++ b/tests/unittests/test_datasource/test_scaleway.py
@@ -9,7 +9,7 @@ from cloudinit import helpers
from cloudinit import settings
from cloudinit.sources import DataSourceScaleway
-from cloudinit.tests.helpers import mock, HttprettyTestCase, TestCase
+from cloudinit.tests.helpers import mock, HttprettyTestCase, CiTestCase
class DataResponses(object):
@@ -63,7 +63,11 @@ class MetadataResponses(object):
return 200, headers, json.dumps(cls.FAKE_METADATA)
-class TestOnScaleway(TestCase):
+class TestOnScaleway(CiTestCase):
+
+ def setUp(self):
+ super(TestOnScaleway, self).setUp()
+ self.tmp = self.tmp_dir()
def install_mocks(self, fake_dmi, fake_file_exists, fake_cmdline):
mock, faked = fake_dmi
@@ -91,7 +95,7 @@ class TestOnScaleway(TestCase):
# When not on Scaleway, get_data() returns False.
datasource = DataSourceScaleway.DataSourceScaleway(
- settings.CFG_BUILTIN, None, helpers.Paths({})
+ settings.CFG_BUILTIN, None, helpers.Paths({'run_dir': self.tmp})
)
self.assertFalse(datasource.get_data())
@@ -159,8 +163,9 @@ def get_source_address_adapter(*args, **kwargs):
class TestDataSourceScaleway(HttprettyTestCase):
def setUp(self):
+ tmp = self.tmp_dir()
self.datasource = DataSourceScaleway.DataSourceScaleway(
- settings.CFG_BUILTIN, None, helpers.Paths({})
+ settings.CFG_BUILTIN, None, helpers.Paths({'run_dir': tmp})
)
super(TestDataSourceScaleway, self).setUp()
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index 933d5b63..88bae5f9 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -359,7 +359,8 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
self.tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp)
- self.paths = c_helpers.Paths({'cloud_dir': self.tmp})
+ self.paths = c_helpers.Paths(
+ {'cloud_dir': self.tmp, 'run_dir': self.tmp})
self.legacy_user_d = os.path.join(self.tmp, 'legacy_user_tmp')
os.mkdir(self.legacy_user_d)