summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorKim Hagen <kim.sidney@gmail.com>2019-01-23 13:20:25 +0100
committerKim Hagen <kim.sidney@gmail.com>2019-01-23 13:20:25 +0100
commitc4da1a5cddacaa7c2a15df9170f21c3fce78fb15 (patch)
treeb6abeebd0789141fea8696e2df9aa3db5663ebf0 /tests
parentb120f4f7a670674779a93f8c882c81f44a993888 (diff)
parent45d731a61a07447521d56e8ce4f19ebfeba2aa78 (diff)
downloadvyos-cloud-init-c4da1a5cddacaa7c2a15df9170f21c3fce78fb15.tar.gz
vyos-cloud-init-c4da1a5cddacaa7c2a15df9170f21c3fce78fb15.zip
Merge tag '18.5' into current
release 18.5 Bump the version on cloudinit/version.py to be 18.5 and update ChangeLog LP: #1808380 Conflicts: config/cloud.cfg.tmpl
Diffstat (limited to 'tests')
-rw-r--r--tests/cloud_tests/releases.yaml16
-rw-r--r--tests/cloud_tests/testcases/base.py18
-rw-r--r--tests/cloud_tests/testcases/modules/apt_configure_primary.py14
-rw-r--r--tests/cloud_tests/testcases/modules/apt_configure_primary.yaml7
-rw-r--r--tests/unittests/test_builtin_handlers.py25
-rw-r--r--tests/unittests/test_cli.py16
-rw-r--r--tests/unittests/test_datasource/test_aliyun.py4
-rw-r--r--tests/unittests/test_datasource/test_altcloud.py118
-rw-r--r--tests/unittests/test_datasource/test_azure.py341
-rw-r--r--tests/unittests/test_datasource/test_cloudsigma.py6
-rw-r--r--tests/unittests/test_datasource/test_configdrive.py3
-rw-r--r--tests/unittests/test_datasource/test_ec2.py60
-rw-r--r--tests/unittests/test_datasource/test_ibmcloud.py40
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py139
-rw-r--r--tests/unittests/test_datasource/test_opennebula.py4
-rw-r--r--tests/unittests/test_datasource/test_ovf.py52
-rw-r--r--tests/unittests/test_datasource/test_smartos.py7
-rw-r--r--tests/unittests/test_ds_identify.py2
-rw-r--r--tests/unittests/test_handler/test_handler_resizefs.py52
-rw-r--r--tests/unittests/test_handler/test_handler_write_files.py12
-rw-r--r--tests/unittests/test_net.py64
-rw-r--r--tests/unittests/test_vmware_config_file.py58
22 files changed, 813 insertions, 245 deletions
diff --git a/tests/cloud_tests/releases.yaml b/tests/cloud_tests/releases.yaml
index defae02b..ec5da724 100644
--- a/tests/cloud_tests/releases.yaml
+++ b/tests/cloud_tests/releases.yaml
@@ -129,6 +129,22 @@ features:
releases:
# UBUNTU =================================================================
+ disco:
+ # EOL: Jan 2020
+ default:
+ enabled: true
+ release: disco
+ version: 19.04
+ os: ubuntu
+ feature_groups:
+ - base
+ - debian_base
+ - ubuntu_specific
+ lxd:
+ sstreams_server: https://cloud-images.ubuntu.com/daily
+ alias: disco
+ setup_overrides: null
+ override_templates: false
cosmic:
# EOL: Jul 2019
default:
diff --git a/tests/cloud_tests/testcases/base.py b/tests/cloud_tests/testcases/base.py
index e18d601c..fd12d87b 100644
--- a/tests/cloud_tests/testcases/base.py
+++ b/tests/cloud_tests/testcases/base.py
@@ -177,7 +177,7 @@ class CloudTestCase(unittest2.TestCase):
instance_data['base64_encoded_keys'])
ds = instance_data.get('ds', {})
v1_data = instance_data.get('v1', {})
- metadata = ds.get('meta_data', {})
+ metadata = ds.get('meta-data', {})
macs = metadata.get(
'network', {}).get('interfaces', {}).get('macs', {})
if not macs:
@@ -195,6 +195,9 @@ class CloudTestCase(unittest2.TestCase):
self.assertIsNotNone(
v1_data['availability_zone'], 'expected ec2 availability_zone')
self.assertEqual('aws', v1_data['cloud_name'])
+ self.assertEqual('ec2', v1_data['platform'])
+ self.assertEqual(
+ 'metadata (http://169.254.169.254)', v1_data['subplatform'])
self.assertIn('i-', v1_data['instance_id'])
self.assertIn('ip-', v1_data['local_hostname'])
self.assertIsNotNone(v1_data['region'], 'expected ec2 region')
@@ -220,7 +223,11 @@ class CloudTestCase(unittest2.TestCase):
instance_data = json.loads(out)
v1_data = instance_data.get('v1', {})
self.assertItemsEqual([], sorted(instance_data['base64_encoded_keys']))
- self.assertEqual('nocloud', v1_data['cloud_name'])
+ self.assertEqual('unknown', v1_data['cloud_name'])
+ self.assertEqual('lxd', v1_data['platform'])
+ self.assertEqual(
+ 'seed-dir (/var/lib/cloud/seed/nocloud-net)',
+ v1_data['subplatform'])
self.assertIsNone(
v1_data['availability_zone'],
'found unexpected lxd availability_zone %s' %
@@ -253,7 +260,12 @@ class CloudTestCase(unittest2.TestCase):
instance_data = json.loads(out)
v1_data = instance_data.get('v1', {})
self.assertItemsEqual([], instance_data['base64_encoded_keys'])
- self.assertEqual('nocloud', v1_data['cloud_name'])
+ self.assertEqual('unknown', v1_data['cloud_name'])
+ self.assertEqual('nocloud', v1_data['platform'])
+ subplatform = v1_data['subplatform']
+ self.assertIsNotNone(
+ re.match(r'config-disk \(\/dev\/[a-z]{3}\)', subplatform),
+ 'kvm subplatform "%s" != "config-disk (/dev/...)"' % subplatform)
self.assertIsNone(
v1_data['availability_zone'],
'found unexpected kvm availability_zone %s' %
diff --git a/tests/cloud_tests/testcases/modules/apt_configure_primary.py b/tests/cloud_tests/testcases/modules/apt_configure_primary.py
index c1c4bbc0..4950a2ef 100644
--- a/tests/cloud_tests/testcases/modules/apt_configure_primary.py
+++ b/tests/cloud_tests/testcases/modules/apt_configure_primary.py
@@ -9,12 +9,16 @@ class TestAptconfigurePrimary(base.CloudTestCase):
def test_ubuntu_sources(self):
"""Test no default Ubuntu entries exist."""
- out = self.get_data_file('ubuntu.sources.list')
- self.assertEqual(0, int(out))
+ out = self.get_data_file('sources.list')
+ ubuntu_source_count = len(
+ [line for line in out.split('\n') if 'archive.ubuntu.com' in line])
+ self.assertEqual(0, ubuntu_source_count)
def test_gatech_sources(self):
- """Test GaTech entires exist."""
- out = self.get_data_file('gatech.sources.list')
- self.assertEqual(20, int(out))
+ """Test GaTech entries exist."""
+ out = self.get_data_file('sources.list')
+ gatech_source_count = len(
+ [line for line in out.split('\n') if 'gtlib.gatech.edu' in line])
+ self.assertGreater(gatech_source_count, 0)
# vi: ts=4 expandtab
diff --git a/tests/cloud_tests/testcases/modules/apt_configure_primary.yaml b/tests/cloud_tests/testcases/modules/apt_configure_primary.yaml
index 41bcf2fd..cc067d4f 100644
--- a/tests/cloud_tests/testcases/modules/apt_configure_primary.yaml
+++ b/tests/cloud_tests/testcases/modules/apt_configure_primary.yaml
@@ -12,13 +12,6 @@ cloud_config: |
- default
uri: "http://www.gtlib.gatech.edu/pub/ubuntu-releases/"
collect_scripts:
- ubuntu.sources.list: |
- #!/bin/bash
- grep -v '^#' /etc/apt/sources.list | sed '/^\s*$/d' | grep -c archive.ubuntu.com
- gatech.sources.list: |
- #!/bin/bash
- grep -v '^#' /etc/apt/sources.list | sed '/^\s*$/d' | grep -c gtlib.gatech.edu
-
sources.list: |
#!/bin/bash
cat /etc/apt/sources.list
diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py
index abe820e1..b92ffc79 100644
--- a/tests/unittests/test_builtin_handlers.py
+++ b/tests/unittests/test_builtin_handlers.py
@@ -3,6 +3,7 @@
"""Tests of the built-in user data handlers."""
import copy
+import errno
import os
import shutil
import tempfile
@@ -202,6 +203,30 @@ class TestJinjaTemplatePartHandler(CiTestCase):
os.path.exists(script_file),
'Unexpected file created %s' % script_file)
+ def test_jinja_template_handle_errors_on_unreadable_instance_data(self):
+ """If instance-data is unreadable, raise an error from handle_part."""
+ script_handler = ShellScriptPartHandler(self.paths)
+ instance_json = os.path.join(self.run_dir, 'instance-data.json')
+ util.write_file(instance_json, util.json_dumps({}))
+ h = JinjaTemplatePartHandler(
+ self.paths, sub_handlers=[script_handler])
+ with mock.patch(self.mpath + 'load_file') as m_load:
+ with self.assertRaises(RuntimeError) as context_manager:
+ m_load.side_effect = OSError(errno.EACCES, 'Not allowed')
+ h.handle_part(
+ data='data', ctype="!" + handlers.CONTENT_START,
+ filename='part01',
+ payload='## template: jinja \n#!/bin/bash\necho himom',
+ frequency='freq', headers='headers')
+ script_file = os.path.join(script_handler.script_dir, 'part01')
+ self.assertEqual(
+ 'Cannot render jinja template vars. No read permission on'
+ " '{rdir}/instance-data.json'. Try sudo".format(rdir=self.run_dir),
+ str(context_manager.exception))
+ self.assertFalse(
+ os.path.exists(script_file),
+ 'Unexpected file created %s' % script_file)
+
@skipUnlessJinja()
def test_jinja_template_handle_renders_jinja_content(self):
"""When present, render jinja variables from instance-data.json."""
diff --git a/tests/unittests/test_cli.py b/tests/unittests/test_cli.py
index 199d69b0..d283f136 100644
--- a/tests/unittests/test_cli.py
+++ b/tests/unittests/test_cli.py
@@ -246,18 +246,18 @@ class TestCLI(test_helpers.FilesystemMockingTestCase):
self.assertEqual('cc_ntp', parseargs.name)
self.assertFalse(parseargs.report)
- @mock.patch('cloudinit.cmd.main.dhclient_hook')
- def test_dhclient_hook_subcommand(self, m_dhclient_hook):
+ @mock.patch('cloudinit.cmd.main.dhclient_hook.handle_args')
+ def test_dhclient_hook_subcommand(self, m_handle_args):
"""The subcommand 'dhclient-hook' calls dhclient_hook with args."""
- self._call_main(['cloud-init', 'dhclient-hook', 'net_action', 'eth0'])
- (name, parseargs) = m_dhclient_hook.call_args_list[0][0]
- self.assertEqual('dhclient_hook', name)
+ self._call_main(['cloud-init', 'dhclient-hook', 'up', 'eth0'])
+ (name, parseargs) = m_handle_args.call_args_list[0][0]
+ self.assertEqual('dhclient-hook', name)
self.assertEqual('dhclient-hook', parseargs.subcommand)
- self.assertEqual('dhclient_hook', parseargs.action[0])
+ self.assertEqual('dhclient-hook', parseargs.action[0])
self.assertFalse(parseargs.debug)
self.assertFalse(parseargs.force)
- self.assertEqual('net_action', parseargs.net_action)
- self.assertEqual('eth0', parseargs.net_interface)
+ self.assertEqual('up', parseargs.event)
+ self.assertEqual('eth0', parseargs.interface)
@mock.patch('cloudinit.cmd.main.main_features')
def test_features_hook_subcommand(self, m_features):
diff --git a/tests/unittests/test_datasource/test_aliyun.py b/tests/unittests/test_datasource/test_aliyun.py
index 1e77842f..e9213ca1 100644
--- a/tests/unittests/test_datasource/test_aliyun.py
+++ b/tests/unittests/test_datasource/test_aliyun.py
@@ -140,6 +140,10 @@ class TestAliYunDatasource(test_helpers.HttprettyTestCase):
self._test_get_sshkey()
self._test_get_iid()
self._test_host_name()
+ self.assertEqual('aliyun', self.ds.cloud_name)
+ self.assertEqual('ec2', self.ds.platform)
+ self.assertEqual(
+ 'metadata (http://100.100.100.200)', self.ds.subplatform)
@mock.patch("cloudinit.sources.DataSourceAliYun._is_aliyun")
def test_returns_false_when_not_on_aliyun(self, m_is_aliyun):
diff --git a/tests/unittests/test_datasource/test_altcloud.py b/tests/unittests/test_datasource/test_altcloud.py
index ff35904e..3119bfac 100644
--- a/tests/unittests/test_datasource/test_altcloud.py
+++ b/tests/unittests/test_datasource/test_altcloud.py
@@ -10,7 +10,6 @@
This test file exercises the code in sources DataSourceAltCloud.py
'''
-import mock
import os
import shutil
import tempfile
@@ -18,32 +17,13 @@ import tempfile
from cloudinit import helpers
from cloudinit import util
-from cloudinit.tests.helpers import CiTestCase
+from cloudinit.tests.helpers import CiTestCase, mock
import cloudinit.sources.DataSourceAltCloud as dsac
OS_UNAME_ORIG = getattr(os, 'uname')
-def _write_cloud_info_file(value):
- '''
- Populate the CLOUD_INFO_FILE which would be populated
- with a cloud backend identifier ImageFactory when building
- an image with ImageFactory.
- '''
- cifile = open(dsac.CLOUD_INFO_FILE, 'w')
- cifile.write(value)
- cifile.close()
- os.chmod(dsac.CLOUD_INFO_FILE, 0o664)
-
-
-def _remove_cloud_info_file():
- '''
- Remove the test CLOUD_INFO_FILE
- '''
- os.remove(dsac.CLOUD_INFO_FILE)
-
-
def _write_user_data_files(mount_dir, value):
'''
Populate the deltacloud_user_data_file the user_data_file
@@ -98,13 +78,15 @@ def _dmi_data(expected):
class TestGetCloudType(CiTestCase):
- '''
- Test to exercise method: DataSourceAltCloud.get_cloud_type()
- '''
+ '''Test to exercise method: DataSourceAltCloud.get_cloud_type()'''
+
+ with_logs = True
def setUp(self):
'''Set up.'''
- self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ super(TestGetCloudType, self).setUp()
+ self.tmp = self.tmp_dir()
+ self.paths = helpers.Paths({'cloud_dir': self.tmp})
self.dmi_data = util.read_dmi_data
# We have a different code path for arm to deal with LP1243287
# We have to switch arch to x86_64 to avoid test failure
@@ -115,6 +97,26 @@ class TestGetCloudType(CiTestCase):
util.read_dmi_data = self.dmi_data
force_arch()
+ def test_cloud_info_file_ioerror(self):
+ """Return UNKNOWN when /etc/sysconfig/cloud-info exists but errors."""
+ self.assertEqual('/etc/sysconfig/cloud-info', dsac.CLOUD_INFO_FILE)
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
+ # Attempting to read the directory generates IOError
+ with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.tmp):
+ self.assertEqual('UNKNOWN', dsrc.get_cloud_type())
+ self.assertIn(
+ "[Errno 21] Is a directory: '%s'" % self.tmp,
+ self.logs.getvalue())
+
+ def test_cloud_info_file(self):
+ """Return uppercase stripped content from /etc/sysconfig/cloud-info."""
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
+ cloud_info = self.tmp_path('cloud-info', dir=self.tmp)
+ util.write_file(cloud_info, ' OverRiDdeN CloudType ')
+ # Attempting to read the directory generates IOError
+ with mock.patch.object(dsac, 'CLOUD_INFO_FILE', cloud_info):
+ self.assertEqual('OVERRIDDEN CLOUDTYPE', dsrc.get_cloud_type())
+
def test_rhev(self):
'''
Test method get_cloud_type() for RHEVm systems.
@@ -153,60 +155,57 @@ class TestGetDataCloudInfoFile(CiTestCase):
self.tmp = self.tmp_dir()
self.paths = helpers.Paths(
{'cloud_dir': self.tmp, 'run_dir': self.tmp})
- self.cloud_info_file = tempfile.mkstemp()[1]
- self.dmi_data = util.read_dmi_data
- dsac.CLOUD_INFO_FILE = self.cloud_info_file
-
- def tearDown(self):
- # Reset
-
- # Attempt to remove the temp file ignoring errors
- try:
- os.remove(self.cloud_info_file)
- except OSError:
- pass
-
- util.read_dmi_data = self.dmi_data
- dsac.CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info'
+ self.cloud_info_file = self.tmp_path('cloud-info', dir=self.tmp)
def test_rhev(self):
'''Success Test module get_data() forcing RHEV.'''
- _write_cloud_info_file('RHEV')
+ util.write_file(self.cloud_info_file, 'RHEV')
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_rhevm = lambda: True
- self.assertEqual(True, dsrc.get_data())
+ with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file):
+ self.assertEqual(True, dsrc.get_data())
+ self.assertEqual('altcloud', dsrc.cloud_name)
+ self.assertEqual('altcloud', dsrc.platform_type)
+ self.assertEqual('rhev (/dev/fd0)', dsrc.subplatform)
def test_vsphere(self):
'''Success Test module get_data() forcing VSPHERE.'''
- _write_cloud_info_file('VSPHERE')
+ util.write_file(self.cloud_info_file, 'VSPHERE')
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_vsphere = lambda: True
- self.assertEqual(True, dsrc.get_data())
+ with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file):
+ self.assertEqual(True, dsrc.get_data())
+ self.assertEqual('altcloud', dsrc.cloud_name)
+ self.assertEqual('altcloud', dsrc.platform_type)
+ self.assertEqual('vsphere (unknown)', dsrc.subplatform)
def test_fail_rhev(self):
'''Failure Test module get_data() forcing RHEV.'''
- _write_cloud_info_file('RHEV')
+ util.write_file(self.cloud_info_file, 'RHEV')
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_rhevm = lambda: False
- self.assertEqual(False, dsrc.get_data())
+ with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file):
+ self.assertEqual(False, dsrc.get_data())
def test_fail_vsphere(self):
'''Failure Test module get_data() forcing VSPHERE.'''
- _write_cloud_info_file('VSPHERE')
+ util.write_file(self.cloud_info_file, 'VSPHERE')
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_vsphere = lambda: False
- self.assertEqual(False, dsrc.get_data())
+ with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file):
+ self.assertEqual(False, dsrc.get_data())
def test_unrecognized(self):
'''Failure Test module get_data() forcing unrecognized.'''
- _write_cloud_info_file('unrecognized')
+ util.write_file(self.cloud_info_file, 'unrecognized')
dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
- self.assertEqual(False, dsrc.get_data())
+ with mock.patch.object(dsac, 'CLOUD_INFO_FILE', self.cloud_info_file):
+ self.assertEqual(False, dsrc.get_data())
class TestGetDataNoCloudInfoFile(CiTestCase):
@@ -322,7 +321,8 @@ class TestUserDataVsphere(CiTestCase):
'''
def setUp(self):
'''Set up.'''
- self.paths = helpers.Paths({'cloud_dir': '/tmp'})
+ self.tmp = self.tmp_dir()
+ self.paths = helpers.Paths({'cloud_dir': self.tmp})
self.mount_dir = tempfile.mkdtemp()
_write_user_data_files(self.mount_dir, 'test user data')
@@ -363,6 +363,22 @@ class TestUserDataVsphere(CiTestCase):
self.assertEqual(1, m_find_devs_with.call_count)
self.assertEqual(1, m_mount_cb.call_count)
+ @mock.patch("cloudinit.sources.DataSourceAltCloud.util.find_devs_with")
+ @mock.patch("cloudinit.sources.DataSourceAltCloud.util.mount_cb")
+ def test_user_data_vsphere_success(self, m_mount_cb, m_find_devs_with):
+ """Test user_data_vsphere() where successful."""
+ m_find_devs_with.return_value = ["/dev/mock/cdrom"]
+ m_mount_cb.return_value = 'raw userdata from cdrom'
+ dsrc = dsac.DataSourceAltCloud({}, None, self.paths)
+ cloud_info = self.tmp_path('cloud-info', dir=self.tmp)
+ util.write_file(cloud_info, 'VSPHERE')
+ self.assertEqual(True, dsrc.user_data_vsphere())
+ m_find_devs_with.assert_called_once_with('LABEL=CDROM')
+ m_mount_cb.assert_called_once_with(
+ '/dev/mock/cdrom', dsac.read_user_data_callback)
+ with mock.patch.object(dsrc, 'get_cloud_type', return_value='VSPHERE'):
+ self.assertEqual('vsphere (/dev/mock/cdrom)', dsrc.subplatform)
+
class TestReadUserDataCallback(CiTestCase):
'''
diff --git a/tests/unittests/test_datasource/test_azure.py b/tests/unittests/test_datasource/test_azure.py
index 4e428b71..417d86a9 100644
--- a/tests/unittests/test_datasource/test_azure.py
+++ b/tests/unittests/test_datasource/test_azure.py
@@ -17,6 +17,7 @@ import crypt
import httpretty
import json
import os
+import requests
import stat
import xml.etree.ElementTree as ET
import yaml
@@ -110,6 +111,8 @@ NETWORK_METADATA = {
}
}
+MOCKPATH = 'cloudinit.sources.DataSourceAzure.'
+
class TestGetMetadataFromIMDS(HttprettyTestCase):
@@ -119,9 +122,9 @@ class TestGetMetadataFromIMDS(HttprettyTestCase):
super(TestGetMetadataFromIMDS, self).setUp()
self.network_md_url = dsaz.IMDS_URL + "instance?api-version=2017-12-01"
- @mock.patch('cloudinit.sources.DataSourceAzure.readurl')
- @mock.patch('cloudinit.sources.DataSourceAzure.EphemeralDHCPv4')
- @mock.patch('cloudinit.sources.DataSourceAzure.net.is_up')
+ @mock.patch(MOCKPATH + 'readurl')
+ @mock.patch(MOCKPATH + 'EphemeralDHCPv4')
+ @mock.patch(MOCKPATH + 'net.is_up')
def test_get_metadata_does_not_dhcp_if_network_is_up(
self, m_net_is_up, m_dhcp, m_readurl):
"""Do not perform DHCP setup when nic is already up."""
@@ -138,9 +141,9 @@ class TestGetMetadataFromIMDS(HttprettyTestCase):
"Crawl of Azure Instance Metadata Service (IMDS) took", # log_time
self.logs.getvalue())
- @mock.patch('cloudinit.sources.DataSourceAzure.readurl')
- @mock.patch('cloudinit.sources.DataSourceAzure.EphemeralDHCPv4')
- @mock.patch('cloudinit.sources.DataSourceAzure.net.is_up')
+ @mock.patch(MOCKPATH + 'readurl')
+ @mock.patch(MOCKPATH + 'EphemeralDHCPv4')
+ @mock.patch(MOCKPATH + 'net.is_up')
def test_get_metadata_performs_dhcp_when_network_is_down(
self, m_net_is_up, m_dhcp, m_readurl):
"""Perform DHCP setup when nic is not up."""
@@ -163,7 +166,7 @@ class TestGetMetadataFromIMDS(HttprettyTestCase):
headers={'Metadata': 'true'}, retries=2, timeout=1)
@mock.patch('cloudinit.url_helper.time.sleep')
- @mock.patch('cloudinit.sources.DataSourceAzure.net.is_up')
+ @mock.patch(MOCKPATH + 'net.is_up')
def test_get_metadata_from_imds_empty_when_no_imds_present(
self, m_net_is_up, m_sleep):
"""Return empty dict when IMDS network metadata is absent."""
@@ -182,6 +185,35 @@ class TestGetMetadataFromIMDS(HttprettyTestCase):
"Crawl of Azure Instance Metadata Service (IMDS) took", # log_time
self.logs.getvalue())
+ @mock.patch('requests.Session.request')
+ @mock.patch('cloudinit.url_helper.time.sleep')
+ @mock.patch(MOCKPATH + 'net.is_up')
+ def test_get_metadata_from_imds_retries_on_timeout(
+ self, m_net_is_up, m_sleep, m_request):
+ """Retry IMDS network metadata on timeout errors."""
+
+ self.attempt = 0
+ m_request.side_effect = requests.Timeout('Fake Connection Timeout')
+
+ def retry_callback(request, uri, headers):
+ self.attempt += 1
+ raise requests.Timeout('Fake connection timeout')
+
+ httpretty.register_uri(
+ httpretty.GET,
+ dsaz.IMDS_URL + 'instance?api-version=2017-12-01',
+ body=retry_callback)
+
+ m_net_is_up.return_value = True # skips dhcp
+
+ self.assertEqual({}, dsaz.get_metadata_from_imds('eth9', retries=3))
+
+ m_net_is_up.assert_called_with('eth9')
+ self.assertEqual([mock.call(1)]*3, m_sleep.call_args_list)
+ self.assertIn(
+ "Crawl of Azure Instance Metadata Service (IMDS) took", # log_time
+ self.logs.getvalue())
+
class TestAzureDataSource(CiTestCase):
@@ -254,7 +286,8 @@ scbus-1 on xpt0 bus 0
])
return dsaz
- def _get_ds(self, data, agent_command=None, distro=None):
+ def _get_ds(self, data, agent_command=None, distro=None,
+ apply_network=None):
def dsdevs():
return data.get('dsdevs', [])
@@ -310,6 +343,8 @@ scbus-1 on xpt0 bus 0
data.get('sys_cfg', {}), distro=distro, paths=self.paths)
if agent_command is not None:
dsrc.ds_cfg['agent_command'] = agent_command
+ if apply_network is not None:
+ dsrc.ds_cfg['apply_network_config'] = apply_network
return dsrc
@@ -380,7 +415,7 @@ fdescfs /dev/fd fdescfs rw 0 0
res = get_path_dev_freebsd('/etc', mnt_list)
self.assertIsNotNone(res)
- @mock.patch('cloudinit.sources.DataSourceAzure._is_platform_viable')
+ @mock.patch(MOCKPATH + '_is_platform_viable')
def test_call_is_platform_viable_seed(self, m_is_platform_viable):
"""Check seed_dir using _is_platform_viable and return False."""
# Return a non-matching asset tag value
@@ -401,6 +436,24 @@ fdescfs /dev/fd fdescfs rw 0 0
self.assertEqual(dsrc.metadata['local-hostname'], odata['HostName'])
self.assertTrue(os.path.isfile(
os.path.join(self.waagent_d, 'ovf-env.xml')))
+ self.assertEqual('azure', dsrc.cloud_name)
+ self.assertEqual('azure', dsrc.platform_type)
+ self.assertEqual(
+ 'seed-dir (%s/seed/azure)' % self.tmp, dsrc.subplatform)
+
+ def test_basic_dev_file(self):
+ """When a device path is used, present that in subplatform."""
+ data = {'sys_cfg': {}, 'dsdevs': ['/dev/cd0']}
+ dsrc = self._get_ds(data)
+ with mock.patch(MOCKPATH + 'util.mount_cb') as m_mount_cb:
+ m_mount_cb.return_value = (
+ {'local-hostname': 'me'}, 'ud', {'cfg': ''}, {})
+ self.assertTrue(dsrc.get_data())
+ self.assertEqual(dsrc.userdata_raw, 'ud')
+ self.assertEqual(dsrc.metadata['local-hostname'], 'me')
+ self.assertEqual('azure', dsrc.cloud_name)
+ self.assertEqual('azure', dsrc.platform_type)
+ self.assertEqual('config-disk (/dev/cd0)', dsrc.subplatform)
def test_get_data_non_ubuntu_will_not_remove_network_scripts(self):
"""get_data on non-Ubuntu will not remove ubuntu net scripts."""
@@ -414,14 +467,26 @@ fdescfs /dev/fd fdescfs rw 0 0
def test_get_data_on_ubuntu_will_remove_network_scripts(self):
"""get_data will remove ubuntu net scripts on Ubuntu distro."""
+ sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
odata = {'HostName': "myhost", 'UserName': "myuser"}
data = {'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': {}}
+ 'sys_cfg': sys_cfg}
dsrc = self._get_ds(data, distro='ubuntu')
dsrc.get_data()
self.m_remove_ubuntu_network_scripts.assert_called_once_with()
+ def test_get_data_on_ubuntu_will_not_remove_network_scripts_disabled(self):
+ """When apply_network_config false, do not remove scripts on Ubuntu."""
+ sys_cfg = {'datasource': {'Azure': {'apply_network_config': False}}}
+ odata = {'HostName': "myhost", 'UserName': "myuser"}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata),
+ 'sys_cfg': sys_cfg}
+
+ dsrc = self._get_ds(data, distro='ubuntu')
+ dsrc.get_data()
+ self.m_remove_ubuntu_network_scripts.assert_not_called()
+
def test_crawl_metadata_returns_structured_data_and_caches_nothing(self):
"""Return all structured metadata and cache no class attributes."""
yaml_cfg = "{agent_command: my_command}\n"
@@ -478,6 +543,61 @@ fdescfs /dev/fd fdescfs rw 0 0
dsrc.crawl_metadata()
self.assertEqual(str(cm.exception), error_msg)
+ @mock.patch('cloudinit.sources.DataSourceAzure.EphemeralDHCPv4')
+ @mock.patch('cloudinit.sources.DataSourceAzure.util.write_file')
+ @mock.patch(
+ 'cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready')
+ @mock.patch('cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds')
+ def test_crawl_metadata_on_reprovision_reports_ready(
+ self, poll_imds_func,
+ report_ready_func,
+ m_write, m_dhcp):
+ """If reprovisioning, report ready at the end"""
+ ovfenv = construct_valid_ovf_env(
+ platform_settings={"PreprovisionedVm": "True"})
+
+ data = {'ovfcontent': ovfenv,
+ 'sys_cfg': {}}
+ dsrc = self._get_ds(data)
+ poll_imds_func.return_value = ovfenv
+ dsrc.crawl_metadata()
+ self.assertEqual(1, report_ready_func.call_count)
+
+ @mock.patch('cloudinit.sources.DataSourceAzure.util.write_file')
+ @mock.patch('cloudinit.sources.helpers.netlink.'
+ 'wait_for_media_disconnect_connect')
+ @mock.patch(
+ 'cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready')
+ @mock.patch('cloudinit.net.dhcp.EphemeralIPv4Network')
+ @mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery')
+ @mock.patch('cloudinit.sources.DataSourceAzure.readurl')
+ def test_crawl_metadata_on_reprovision_reports_ready_using_lease(
+ self, m_readurl, m_dhcp,
+ m_net, report_ready_func,
+ m_media_switch, m_write):
+ """If reprovisioning, report ready using the obtained lease"""
+ ovfenv = construct_valid_ovf_env(
+ platform_settings={"PreprovisionedVm": "True"})
+
+ data = {'ovfcontent': ovfenv,
+ 'sys_cfg': {}}
+ dsrc = self._get_ds(data)
+
+ lease = {
+ 'interface': 'eth9', 'fixed-address': '192.168.2.9',
+ 'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0',
+ 'unknown-245': '624c3620'}
+ m_dhcp.return_value = [lease]
+ m_media_switch.return_value = None
+
+ reprovision_ovfenv = construct_valid_ovf_env()
+ m_readurl.return_value = url_helper.StringResponse(
+ reprovision_ovfenv.encode('utf-8'))
+
+ dsrc.crawl_metadata()
+ self.assertEqual(2, report_ready_func.call_count)
+ report_ready_func.assert_called_with(lease=lease)
+
def test_waagent_d_has_0700_perms(self):
# we expect /var/lib/waagent to be created 0700
dsrc = self._get_ds({'ovfcontent': construct_valid_ovf_env()})
@@ -503,8 +623,10 @@ fdescfs /dev/fd fdescfs rw 0 0
def test_network_config_set_from_imds(self):
"""Datasource.network_config returns IMDS network data."""
+ sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
odata = {}
- data = {'ovfcontent': construct_valid_ovf_env(data=odata)}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata),
+ 'sys_cfg': sys_cfg}
expected_network_config = {
'ethernets': {
'eth0': {'set-name': 'eth0',
@@ -769,8 +891,8 @@ fdescfs /dev/fd fdescfs rw 0 0
ds.get_data()
self.assertEqual(self.instance_id, ds.metadata['instance-id'])
- @mock.patch("cloudinit.sources.DataSourceAzure.util.is_FreeBSD")
- @mock.patch("cloudinit.sources.DataSourceAzure._check_freebsd_cdrom")
+ @mock.patch(MOCKPATH + 'util.is_FreeBSD')
+ @mock.patch(MOCKPATH + '_check_freebsd_cdrom')
def test_list_possible_azure_ds_devs(self, m_check_fbsd_cdrom,
m_is_FreeBSD):
"""On FreeBSD, possible devs should show /dev/cd0."""
@@ -783,9 +905,10 @@ fdescfs /dev/fd fdescfs rw 0 0
@mock.patch('cloudinit.net.generate_fallback_config')
def test_imds_network_config(self, mock_fallback):
"""Network config is generated from IMDS network data when present."""
+ sys_cfg = {'datasource': {'Azure': {'apply_network_config': True}}}
odata = {'HostName': "myhost", 'UserName': "myuser"}
data = {'ovfcontent': construct_valid_ovf_env(data=odata),
- 'sys_cfg': {}}
+ 'sys_cfg': sys_cfg}
dsrc = self._get_ds(data)
ret = dsrc.get_data()
@@ -805,6 +928,36 @@ fdescfs /dev/fd fdescfs rw 0 0
@mock.patch('cloudinit.net.get_devicelist')
@mock.patch('cloudinit.net.device_driver')
@mock.patch('cloudinit.net.generate_fallback_config')
+ def test_imds_network_ignored_when_apply_network_config_false(
+ self, mock_fallback, mock_dd, mock_devlist, mock_get_mac):
+ """When apply_network_config is False, use fallback instead of IMDS."""
+ sys_cfg = {'datasource': {'Azure': {'apply_network_config': False}}}
+ odata = {'HostName': "myhost", 'UserName': "myuser"}
+ data = {'ovfcontent': construct_valid_ovf_env(data=odata),
+ 'sys_cfg': sys_cfg}
+ fallback_config = {
+ 'version': 1,
+ 'config': [{
+ 'type': 'physical', 'name': 'eth0',
+ 'mac_address': '00:11:22:33:44:55',
+ 'params': {'driver': 'hv_netsvc'},
+ 'subnets': [{'type': 'dhcp'}],
+ }]
+ }
+ mock_fallback.return_value = fallback_config
+
+ mock_devlist.return_value = ['eth0']
+ mock_dd.return_value = ['hv_netsvc']
+ mock_get_mac.return_value = '00:11:22:33:44:55'
+
+ dsrc = self._get_ds(data)
+ self.assertTrue(dsrc.get_data())
+ self.assertEqual(dsrc.network_config, fallback_config)
+
+ @mock.patch('cloudinit.net.get_interface_mac')
+ @mock.patch('cloudinit.net.get_devicelist')
+ @mock.patch('cloudinit.net.device_driver')
+ @mock.patch('cloudinit.net.generate_fallback_config')
def test_fallback_network_config(self, mock_fallback, mock_dd,
mock_devlist, mock_get_mac):
"""On absent IMDS network data, generate network fallback config."""
@@ -885,17 +1038,17 @@ fdescfs /dev/fd fdescfs rw 0 0
expected_config['config'].append(blacklist_config)
self.assertEqual(netconfig, expected_config)
- @mock.patch("cloudinit.sources.DataSourceAzure.util.subp")
+ @mock.patch(MOCKPATH + 'util.subp')
def test_get_hostname_with_no_args(self, subp):
dsaz.get_hostname()
subp.assert_called_once_with(("hostname",), capture=True)
- @mock.patch("cloudinit.sources.DataSourceAzure.util.subp")
+ @mock.patch(MOCKPATH + 'util.subp')
def test_get_hostname_with_string_arg(self, subp):
dsaz.get_hostname(hostname_command="hostname")
subp.assert_called_once_with(("hostname",), capture=True)
- @mock.patch("cloudinit.sources.DataSourceAzure.util.subp")
+ @mock.patch(MOCKPATH + 'util.subp')
def test_get_hostname_with_iterable_arg(self, subp):
dsaz.get_hostname(hostname_command=("hostname",))
subp.assert_called_once_with(("hostname",), capture=True)
@@ -949,7 +1102,7 @@ class TestAzureBounce(CiTestCase):
self.set_hostname = self.patches.enter_context(
mock.patch.object(dsaz, 'set_hostname'))
self.subp = self.patches.enter_context(
- mock.patch('cloudinit.sources.DataSourceAzure.util.subp'))
+ mock.patch(MOCKPATH + 'util.subp'))
self.find_fallback_nic = self.patches.enter_context(
mock.patch('cloudinit.net.find_fallback_nic', return_value='eth9'))
@@ -989,7 +1142,7 @@ class TestAzureBounce(CiTestCase):
ds.get_data()
self.assertEqual(0, self.set_hostname.call_count)
- @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ @mock.patch(MOCKPATH + 'perform_hostname_bounce')
def test_disabled_bounce_does_not_perform_bounce(
self, perform_hostname_bounce):
cfg = {'hostname_bounce': {'policy': 'off'}}
@@ -1005,7 +1158,7 @@ class TestAzureBounce(CiTestCase):
ds.get_data()
self.assertEqual(0, self.set_hostname.call_count)
- @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ @mock.patch(MOCKPATH + 'perform_hostname_bounce')
def test_unchanged_hostname_does_not_perform_bounce(
self, perform_hostname_bounce):
host_name = 'unchanged-host-name'
@@ -1015,7 +1168,7 @@ class TestAzureBounce(CiTestCase):
ds.get_data()
self.assertEqual(0, perform_hostname_bounce.call_count)
- @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ @mock.patch(MOCKPATH + 'perform_hostname_bounce')
def test_force_performs_bounce_regardless(self, perform_hostname_bounce):
host_name = 'unchanged-host-name'
self.get_hostname.return_value = host_name
@@ -1032,7 +1185,7 @@ class TestAzureBounce(CiTestCase):
cfg = {'hostname_bounce': {'policy': 'force'}}
dsrc = self._get_ds(self.get_ovf_env_with_dscfg(host_name, cfg),
agent_command=['not', '__builtin__'])
- patch_path = 'cloudinit.sources.DataSourceAzure.util.which'
+ patch_path = MOCKPATH + 'util.which'
with mock.patch(patch_path) as m_which:
m_which.return_value = None
ret = self._get_and_setup(dsrc)
@@ -1053,7 +1206,7 @@ class TestAzureBounce(CiTestCase):
self.assertEqual(expected_hostname,
self.set_hostname.call_args_list[0][0][0])
- @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ @mock.patch(MOCKPATH + 'perform_hostname_bounce')
def test_different_hostnames_performs_bounce(
self, perform_hostname_bounce):
expected_hostname = 'azure-expected-host-name'
@@ -1076,7 +1229,7 @@ class TestAzureBounce(CiTestCase):
self.assertEqual(initial_host_name,
self.set_hostname.call_args_list[-1][0][0])
- @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ @mock.patch(MOCKPATH + 'perform_hostname_bounce')
def test_failure_in_bounce_still_resets_host_name(
self, perform_hostname_bounce):
perform_hostname_bounce.side_effect = Exception
@@ -1117,7 +1270,7 @@ class TestAzureBounce(CiTestCase):
self.assertEqual(
dsaz.BOUNCE_COMMAND_IFUP, bounce_args)
- @mock.patch('cloudinit.sources.DataSourceAzure.perform_hostname_bounce')
+ @mock.patch(MOCKPATH + 'perform_hostname_bounce')
def test_set_hostname_option_can_disable_bounce(
self, perform_hostname_bounce):
cfg = {'set_hostname': False, 'hostname_bounce': {'policy': 'force'}}
@@ -1218,12 +1371,12 @@ class TestCanDevBeReformatted(CiTestCase):
def has_ntfs_fs(device):
return bypath.get(device, {}).get('fs') == 'ntfs'
- p = 'cloudinit.sources.DataSourceAzure'
- self._domock(p + "._partitions_on_device", 'm_partitions_on_device')
- self._domock(p + "._has_ntfs_filesystem", 'm_has_ntfs_filesystem')
- self._domock(p + ".util.mount_cb", 'm_mount_cb')
- self._domock(p + ".os.path.realpath", 'm_realpath')
- self._domock(p + ".os.path.exists", 'm_exists')
+ p = MOCKPATH
+ self._domock(p + "_partitions_on_device", 'm_partitions_on_device')
+ self._domock(p + "_has_ntfs_filesystem", 'm_has_ntfs_filesystem')
+ self._domock(p + "util.mount_cb", 'm_mount_cb')
+ self._domock(p + "os.path.realpath", 'm_realpath')
+ self._domock(p + "os.path.exists", 'm_exists')
self.m_exists.side_effect = lambda p: p in bypath
self.m_realpath.side_effect = realpath
@@ -1391,21 +1544,20 @@ class TestCanDevBeReformatted(CiTestCase):
'/dev/sda1': {'num': 1, 'fs': 'ntfs', 'files': []}
}}})
- err = ("Unexpected error while running command.\n",
- "Command: ['mount', '-o', 'ro,sync', '-t', 'auto', ",
- "'/dev/sda1', '/fake-tmp/dir']\n"
- "Exit code: 32\n"
- "Reason: -\n"
- "Stdout: -\n"
- "Stderr: mount: unknown filesystem type 'ntfs'")
- self.m_mount_cb.side_effect = MountFailedError(
- 'Failed mounting %s to %s due to: %s' %
- ('/dev/sda', '/fake-tmp/dir', err))
-
- value, msg = dsaz.can_dev_be_reformatted('/dev/sda',
- preserve_ntfs=False)
- self.assertTrue(value)
- self.assertIn('cannot mount NTFS, assuming', msg)
+ error_msgs = [
+ "Stderr: mount: unknown filesystem type 'ntfs'", # RHEL
+ "Stderr: mount: /dev/sdb1: unknown filesystem type 'ntfs'" # SLES
+ ]
+
+ for err_msg in error_msgs:
+ self.m_mount_cb.side_effect = MountFailedError(
+ "Failed mounting %s to %s due to: \nUnexpected.\n%s" %
+ ('/dev/sda', '/fake-tmp/dir', err_msg))
+
+ value, msg = dsaz.can_dev_be_reformatted('/dev/sda',
+ preserve_ntfs=False)
+ self.assertTrue(value)
+ self.assertIn('cannot mount NTFS, assuming', msg)
def test_never_destroy_ntfs_config_false(self):
"""Normally formattable situation with never_destroy_ntfs set."""
@@ -1488,7 +1640,7 @@ class TestPreprovisioningShouldReprovision(CiTestCase):
self.paths = helpers.Paths({'cloud_dir': tmp})
dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
- @mock.patch('cloudinit.sources.DataSourceAzure.util.write_file')
+ @mock.patch(MOCKPATH + 'util.write_file')
def test__should_reprovision_with_true_cfg(self, isfile, write_f):
"""The _should_reprovision method should return true with config
flag present."""
@@ -1512,7 +1664,7 @@ class TestPreprovisioningShouldReprovision(CiTestCase):
dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
self.assertFalse(dsa._should_reprovision((None, None, {}, None)))
- @mock.patch('cloudinit.sources.DataSourceAzure.DataSourceAzure._poll_imds')
+ @mock.patch(MOCKPATH + 'DataSourceAzure._poll_imds')
def test_reprovision_calls__poll_imds(self, _poll_imds, isfile):
"""_reprovision will poll IMDS."""
isfile.return_value = False
@@ -1527,9 +1679,10 @@ class TestPreprovisioningShouldReprovision(CiTestCase):
@mock.patch('cloudinit.net.dhcp.EphemeralIPv4Network')
@mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery')
+@mock.patch('cloudinit.sources.helpers.netlink.'
+ 'wait_for_media_disconnect_connect')
@mock.patch('requests.Session.request')
-@mock.patch(
- 'cloudinit.sources.DataSourceAzure.DataSourceAzure._report_ready')
+@mock.patch(MOCKPATH + 'DataSourceAzure._report_ready')
class TestPreprovisioningPollIMDS(CiTestCase):
def setUp(self):
@@ -1539,45 +1692,69 @@ class TestPreprovisioningPollIMDS(CiTestCase):
self.paths = helpers.Paths({'cloud_dir': self.tmp})
dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
- @mock.patch('cloudinit.sources.DataSourceAzure.util.write_file')
- def test_poll_imds_calls_report_ready(self, write_f, report_ready_func,
- fake_resp, m_dhcp, m_net):
- """The poll_imds will call report_ready after creating marker file."""
- report_marker = self.tmp_path('report_marker', self.tmp)
+ @mock.patch(MOCKPATH + 'EphemeralDHCPv4')
+ def test_poll_imds_re_dhcp_on_timeout(self, m_dhcpv4, report_ready_func,
+ fake_resp, m_media_switch, m_dhcp,
+ m_net):
+ """The poll_imds will retry DHCP on IMDS timeout."""
+ report_file = self.tmp_path('report_marker', self.tmp)
lease = {
'interface': 'eth9', 'fixed-address': '192.168.2.9',
'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0',
'unknown-245': '624c3620'}
m_dhcp.return_value = [lease]
+ m_media_switch.return_value = None
+ dhcp_ctx = mock.MagicMock(lease=lease)
+ dhcp_ctx.obtain_lease.return_value = lease
+ m_dhcpv4.return_value = dhcp_ctx
+
+ self.tries = 0
+
+ def fake_timeout_once(**kwargs):
+ self.tries += 1
+ if self.tries == 1:
+ raise requests.Timeout('Fake connection timeout')
+ elif self.tries == 2:
+ response = requests.Response()
+ response.status_code = 404
+ raise requests.exceptions.HTTPError(
+ "fake 404", response=response)
+ # Third try should succeed and stop retries or redhcp
+ return mock.MagicMock(status_code=200, text="good", content="good")
+
+ fake_resp.side_effect = fake_timeout_once
+
dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
- mock_path = (
- 'cloudinit.sources.DataSourceAzure.REPORTED_READY_MARKER_FILE')
- with mock.patch(mock_path, report_marker):
+ with mock.patch(MOCKPATH + 'REPORTED_READY_MARKER_FILE', report_file):
dsa._poll_imds()
self.assertEqual(report_ready_func.call_count, 1)
report_ready_func.assert_called_with(lease=lease)
+ self.assertEqual(3, m_dhcpv4.call_count, 'Expected 3 DHCP calls')
+ self.assertEqual(3, self.tries, 'Expected 3 total reads from IMDS')
- def test_poll_imds_report_ready_false(self, report_ready_func,
- fake_resp, m_dhcp, m_net):
+ def test_poll_imds_report_ready_false(self,
+ report_ready_func, fake_resp,
+ m_media_switch, m_dhcp, m_net):
"""The poll_imds should not call reporting ready
when flag is false"""
- report_marker = self.tmp_path('report_marker', self.tmp)
- write_file(report_marker, content='dont run report_ready :)')
+ report_file = self.tmp_path('report_marker', self.tmp)
+ write_file(report_file, content='dont run report_ready :)')
m_dhcp.return_value = [{
'interface': 'eth9', 'fixed-address': '192.168.2.9',
'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0',
'unknown-245': '624c3620'}]
+ m_media_switch.return_value = None
dsa = dsaz.DataSourceAzure({}, distro=None, paths=self.paths)
- mock_path = (
- 'cloudinit.sources.DataSourceAzure.REPORTED_READY_MARKER_FILE')
- with mock.patch(mock_path, report_marker):
+ with mock.patch(MOCKPATH + 'REPORTED_READY_MARKER_FILE', report_file):
dsa._poll_imds()
self.assertEqual(report_ready_func.call_count, 0)
-@mock.patch('cloudinit.sources.DataSourceAzure.util.subp')
-@mock.patch('cloudinit.sources.DataSourceAzure.util.write_file')
-@mock.patch('cloudinit.sources.DataSourceAzure.util.is_FreeBSD')
+@mock.patch(MOCKPATH + 'util.subp')
+@mock.patch(MOCKPATH + 'util.write_file')
+@mock.patch(MOCKPATH + 'util.is_FreeBSD')
+@mock.patch('cloudinit.sources.helpers.netlink.'
+ 'wait_for_media_disconnect_connect')
@mock.patch('cloudinit.net.dhcp.EphemeralIPv4Network')
@mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery')
@mock.patch('requests.Session.request')
@@ -1590,10 +1767,13 @@ class TestAzureDataSourcePreprovisioning(CiTestCase):
self.paths = helpers.Paths({'cloud_dir': tmp})
dsaz.BUILTIN_DS_CONFIG['data_dir'] = self.waagent_d
- def test_poll_imds_returns_ovf_env(self, fake_resp, m_dhcp, m_net,
+ def test_poll_imds_returns_ovf_env(self, fake_resp,
+ m_dhcp, m_net,
+ m_media_switch,
m_is_bsd, write_f, subp):
"""The _poll_imds method should return the ovf_env.xml."""
m_is_bsd.return_value = False
+ m_media_switch.return_value = None
m_dhcp.return_value = [{
'interface': 'eth9', 'fixed-address': '192.168.2.9',
'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0'}]
@@ -1611,16 +1791,19 @@ class TestAzureDataSourcePreprovisioning(CiTestCase):
'Cloud-Init/%s' % vs()
}, method='GET', timeout=1,
url=full_url)])
- self.assertEqual(m_dhcp.call_count, 1)
+ self.assertEqual(m_dhcp.call_count, 2)
m_net.assert_any_call(
broadcast='192.168.2.255', interface='eth9', ip='192.168.2.9',
prefix_or_mask='255.255.255.0', router='192.168.2.1')
- self.assertEqual(m_net.call_count, 1)
+ self.assertEqual(m_net.call_count, 2)
- def test__reprovision_calls__poll_imds(self, fake_resp, m_dhcp, m_net,
+ def test__reprovision_calls__poll_imds(self, fake_resp,
+ m_dhcp, m_net,
+ m_media_switch,
m_is_bsd, write_f, subp):
"""The _reprovision method should call poll IMDS."""
m_is_bsd.return_value = False
+ m_media_switch.return_value = None
m_dhcp.return_value = [{
'interface': 'eth9', 'fixed-address': '192.168.2.9',
'routers': '192.168.2.1', 'subnet-mask': '255.255.255.0',
@@ -1644,11 +1827,11 @@ class TestAzureDataSourcePreprovisioning(CiTestCase):
'User-Agent':
'Cloud-Init/%s' % vs()},
method='GET', timeout=1, url=full_url)])
- self.assertEqual(m_dhcp.call_count, 1)
+ self.assertEqual(m_dhcp.call_count, 2)
m_net.assert_any_call(
broadcast='192.168.2.255', interface='eth9', ip='192.168.2.9',
prefix_or_mask='255.255.255.0', router='192.168.2.1')
- self.assertEqual(m_net.call_count, 1)
+ self.assertEqual(m_net.call_count, 2)
class TestRemoveUbuntuNetworkConfigScripts(CiTestCase):
@@ -1688,7 +1871,7 @@ class TestRemoveUbuntuNetworkConfigScripts(CiTestCase):
self.tmp_path('notfilehere', dir=self.tmp)])
self.assertNotIn('/not/a', self.logs.getvalue()) # No delete logs
- @mock.patch('cloudinit.sources.DataSourceAzure.os.path.exists')
+ @mock.patch(MOCKPATH + 'os.path.exists')
def test_remove_network_scripts_default_removes_stock_scripts(self,
m_exists):
"""Azure's stock ubuntu image scripts and artifacts are removed."""
@@ -1704,14 +1887,14 @@ class TestWBIsPlatformViable(CiTestCase):
"""White box tests for _is_platform_viable."""
with_logs = True
- @mock.patch('cloudinit.sources.DataSourceAzure.util.read_dmi_data')
+ @mock.patch(MOCKPATH + 'util.read_dmi_data')
def test_true_on_non_azure_chassis(self, m_read_dmi_data):
"""Return True if DMI chassis-asset-tag is AZURE_CHASSIS_ASSET_TAG."""
m_read_dmi_data.return_value = dsaz.AZURE_CHASSIS_ASSET_TAG
self.assertTrue(dsaz._is_platform_viable('doesnotmatter'))
- @mock.patch('cloudinit.sources.DataSourceAzure.os.path.exists')
- @mock.patch('cloudinit.sources.DataSourceAzure.util.read_dmi_data')
+ @mock.patch(MOCKPATH + 'os.path.exists')
+ @mock.patch(MOCKPATH + 'util.read_dmi_data')
def test_true_on_azure_ovf_env_in_seed_dir(self, m_read_dmi_data, m_exist):
"""Return True if ovf-env.xml exists in known seed dirs."""
# Non-matching Azure chassis-asset-tag
@@ -1729,7 +1912,7 @@ class TestWBIsPlatformViable(CiTestCase):
and no devices have a label starting with prefix 'rd_rdfe_'.
"""
self.assertFalse(wrap_and_call(
- 'cloudinit.sources.DataSourceAzure',
+ MOCKPATH,
{'os.path.exists': False,
# Non-matching Azure chassis-asset-tag
'util.read_dmi_data': dsaz.AZURE_CHASSIS_ASSET_TAG + 'X',
diff --git a/tests/unittests/test_datasource/test_cloudsigma.py b/tests/unittests/test_datasource/test_cloudsigma.py
index 380ad1b5..3bf52e69 100644
--- a/tests/unittests/test_datasource/test_cloudsigma.py
+++ b/tests/unittests/test_datasource/test_cloudsigma.py
@@ -68,6 +68,12 @@ class DataSourceCloudSigmaTest(test_helpers.CiTestCase):
self.assertEqual(SERVER_CONTEXT['uuid'],
self.datasource.get_instance_id())
+ def test_platform(self):
+ """All platform-related attributes are set."""
+ self.assertEqual(self.datasource.cloud_name, 'cloudsigma')
+ self.assertEqual(self.datasource.platform_type, 'cloudsigma')
+ self.assertEqual(self.datasource.subplatform, 'cepko (/dev/ttyS1)')
+
def test_metadata(self):
self.assertEqual(self.datasource.metadata, SERVER_CONTEXT)
diff --git a/tests/unittests/test_datasource/test_configdrive.py b/tests/unittests/test_datasource/test_configdrive.py
index 231619c9..dcdabea5 100644
--- a/tests/unittests/test_datasource/test_configdrive.py
+++ b/tests/unittests/test_datasource/test_configdrive.py
@@ -478,6 +478,9 @@ class TestConfigDriveDataSource(CiTestCase):
myds = cfg_ds_from_dir(self.tmp, files=CFG_DRIVE_FILES_V2)
self.assertEqual(myds.get_public_ssh_keys(),
[OSTACK_META['public_keys']['mykey']])
+ self.assertEqual('configdrive', myds.cloud_name)
+ self.assertEqual('openstack', myds.platform)
+ self.assertEqual('seed-dir (%s/seed)' % self.tmp, myds.subplatform)
class TestNetJson(CiTestCase):
diff --git a/tests/unittests/test_datasource/test_ec2.py b/tests/unittests/test_datasource/test_ec2.py
index 497e7610..1a5956d9 100644
--- a/tests/unittests/test_datasource/test_ec2.py
+++ b/tests/unittests/test_datasource/test_ec2.py
@@ -211,9 +211,9 @@ class TestEc2(test_helpers.HttprettyTestCase):
self.metadata_addr = self.datasource.metadata_urls[0]
self.tmp = self.tmp_dir()
- def data_url(self, version):
+ def data_url(self, version, data_item='meta-data'):
"""Return a metadata url based on the version provided."""
- return '/'.join([self.metadata_addr, version, 'meta-data', ''])
+ return '/'.join([self.metadata_addr, version, data_item])
def _patch_add_cleanup(self, mpath, *args, **kwargs):
p = mock.patch(mpath, *args, **kwargs)
@@ -238,10 +238,18 @@ class TestEc2(test_helpers.HttprettyTestCase):
all_versions = (
[ds.min_metadata_version] + ds.extended_metadata_versions)
for version in all_versions:
- metadata_url = self.data_url(version)
+ metadata_url = self.data_url(version) + '/'
if version == md_version:
# Register all metadata for desired version
- register_mock_metaserver(metadata_url, md)
+ register_mock_metaserver(
+ metadata_url, md.get('md', DEFAULT_METADATA))
+ userdata_url = self.data_url(
+ version, data_item='user-data')
+ register_mock_metaserver(userdata_url, md.get('ud', ''))
+ identity_url = self.data_url(
+ version, data_item='dynamic/instance-identity')
+ register_mock_metaserver(
+ identity_url, md.get('id', DYNAMIC_METADATA))
else:
instance_id_url = metadata_url + 'instance-id'
if version == ds.min_metadata_version:
@@ -261,7 +269,7 @@ class TestEc2(test_helpers.HttprettyTestCase):
ds = self._setup_ds(
platform_data=self.valid_platform_data,
sys_cfg={'datasource': {'Ec2': {'strict_id': True}}},
- md=DEFAULT_METADATA)
+ md={'md': DEFAULT_METADATA})
find_fallback_path = (
'cloudinit.sources.DataSourceEc2.net.find_fallback_nic')
with mock.patch(find_fallback_path) as m_find_fallback:
@@ -293,7 +301,7 @@ class TestEc2(test_helpers.HttprettyTestCase):
ds = self._setup_ds(
platform_data=self.valid_platform_data,
sys_cfg={'datasource': {'Ec2': {'strict_id': True}}},
- md=DEFAULT_METADATA)
+ md={'md': DEFAULT_METADATA})
find_fallback_path = (
'cloudinit.sources.DataSourceEc2.net.find_fallback_nic')
with mock.patch(find_fallback_path) as m_find_fallback:
@@ -322,7 +330,7 @@ class TestEc2(test_helpers.HttprettyTestCase):
ds = self._setup_ds(
platform_data=self.valid_platform_data,
sys_cfg={'datasource': {'Ec2': {'strict_id': True}}},
- md=DEFAULT_METADATA)
+ md={'md': DEFAULT_METADATA})
ds._network_config = {'cached': 'data'}
self.assertEqual({'cached': 'data'}, ds.network_config)
@@ -338,7 +346,7 @@ class TestEc2(test_helpers.HttprettyTestCase):
ds = self._setup_ds(
platform_data=self.valid_platform_data,
sys_cfg={'datasource': {'Ec2': {'strict_id': True}}},
- md=old_metadata)
+ md={'md': old_metadata})
self.assertTrue(ds.get_data())
# Provide new revision of metadata that contains network data
register_mock_metaserver(
@@ -351,7 +359,9 @@ class TestEc2(test_helpers.HttprettyTestCase):
m_get_interface_mac.return_value = mac1
nc = ds.network_config # Will re-crawl network metadata
self.assertIsNotNone(nc)
- self.assertIn('Re-crawl of metadata service', self.logs.getvalue())
+ self.assertIn(
+ 'Refreshing stale metadata from prior to upgrade',
+ self.logs.getvalue())
expected = {'version': 1, 'config': [
{'mac_address': '06:17:04:d7:26:09',
'name': 'eth9',
@@ -370,7 +380,7 @@ class TestEc2(test_helpers.HttprettyTestCase):
ds = self._setup_ds(
platform_data=self.valid_platform_data,
sys_cfg={'datasource': {'Ec2': {'strict_id': False}}},
- md=DEFAULT_METADATA)
+ md={'md': DEFAULT_METADATA})
# Mock 404s on all versions except latest
all_versions = (
[ds.min_metadata_version] + ds.extended_metadata_versions)
@@ -386,7 +396,7 @@ class TestEc2(test_helpers.HttprettyTestCase):
register_mock_metaserver(
'{0}/{1}/dynamic/'.format(ds.metadata_address, all_versions[-1]),
DYNAMIC_METADATA)
- ds._cloud_platform = ec2.Platforms.AWS
+ ds._cloud_name = ec2.CloudNames.AWS
# Setup cached metadata on the Datasource
ds.metadata = DEFAULT_METADATA
self.assertEqual('my-identity-id', ds.get_instance_id())
@@ -397,17 +407,20 @@ class TestEc2(test_helpers.HttprettyTestCase):
ds = self._setup_ds(
platform_data=self.valid_platform_data,
sys_cfg={'datasource': {'Ec2': {'strict_id': True}}},
- md=DEFAULT_METADATA)
+ md={'md': DEFAULT_METADATA})
ret = ds.get_data()
self.assertTrue(ret)
self.assertEqual(0, m_dhcp.call_count)
+ self.assertEqual('aws', ds.cloud_name)
+ self.assertEqual('ec2', ds.platform_type)
+ self.assertEqual('metadata (%s)' % ds.metadata_address, ds.subplatform)
def test_valid_platform_with_strict_false(self):
"""Valid platform data should return true with strict_id false."""
ds = self._setup_ds(
platform_data=self.valid_platform_data,
sys_cfg={'datasource': {'Ec2': {'strict_id': False}}},
- md=DEFAULT_METADATA)
+ md={'md': DEFAULT_METADATA})
ret = ds.get_data()
self.assertTrue(ret)
@@ -417,7 +430,7 @@ class TestEc2(test_helpers.HttprettyTestCase):
ds = self._setup_ds(
platform_data={'uuid': uuid, 'uuid_source': 'dmi', 'serial': ''},
sys_cfg={'datasource': {'Ec2': {'strict_id': True}}},
- md=DEFAULT_METADATA)
+ md={'md': DEFAULT_METADATA})
ret = ds.get_data()
self.assertFalse(ret)
@@ -427,7 +440,7 @@ class TestEc2(test_helpers.HttprettyTestCase):
ds = self._setup_ds(
platform_data={'uuid': uuid, 'uuid_source': 'dmi', 'serial': ''},
sys_cfg={'datasource': {'Ec2': {'strict_id': False}}},
- md=DEFAULT_METADATA)
+ md={'md': DEFAULT_METADATA})
ret = ds.get_data()
self.assertTrue(ret)
@@ -437,18 +450,19 @@ class TestEc2(test_helpers.HttprettyTestCase):
ds = self._setup_ds(
platform_data=self.valid_platform_data,
sys_cfg={'datasource': {'Ec2': {'strict_id': False}}},
- md=DEFAULT_METADATA)
+ md={'md': DEFAULT_METADATA})
platform_attrs = [
- attr for attr in ec2.Platforms.__dict__.keys()
+ attr for attr in ec2.CloudNames.__dict__.keys()
if not attr.startswith('__')]
for attr_name in platform_attrs:
- platform_name = getattr(ec2.Platforms, attr_name)
- if platform_name != 'AWS':
- ds._cloud_platform = platform_name
+ platform_name = getattr(ec2.CloudNames, attr_name)
+ if platform_name != 'aws':
+ ds._cloud_name = platform_name
ret = ds.get_data()
+ self.assertEqual('ec2', ds.platform_type)
self.assertFalse(ret)
message = (
- "Local Ec2 mode only supported on ('AWS',),"
+ "Local Ec2 mode only supported on ('aws',),"
' not {0}'.format(platform_name))
self.assertIn(message, self.logs.getvalue())
@@ -463,7 +477,7 @@ class TestEc2(test_helpers.HttprettyTestCase):
ds = self._setup_ds(
platform_data=self.valid_platform_data,
sys_cfg={'datasource': {'Ec2': {'strict_id': False}}},
- md=DEFAULT_METADATA)
+ md={'md': DEFAULT_METADATA})
ret = ds.get_data()
self.assertFalse(ret)
self.assertIn(
@@ -493,7 +507,7 @@ class TestEc2(test_helpers.HttprettyTestCase):
ds = self._setup_ds(
platform_data=self.valid_platform_data,
sys_cfg={'datasource': {'Ec2': {'strict_id': False}}},
- md=DEFAULT_METADATA)
+ md={'md': DEFAULT_METADATA})
ret = ds.get_data()
self.assertTrue(ret)
diff --git a/tests/unittests/test_datasource/test_ibmcloud.py b/tests/unittests/test_datasource/test_ibmcloud.py
index e639ae47..0b54f585 100644
--- a/tests/unittests/test_datasource/test_ibmcloud.py
+++ b/tests/unittests/test_datasource/test_ibmcloud.py
@@ -1,14 +1,17 @@
# This file is part of cloud-init. See LICENSE file for license information.
+from cloudinit.helpers import Paths
from cloudinit.sources import DataSourceIBMCloud as ibm
from cloudinit.tests import helpers as test_helpers
+from cloudinit import util
import base64
import copy
import json
-import mock
from textwrap import dedent
+mock = test_helpers.mock
+
D_PATH = "cloudinit.sources.DataSourceIBMCloud."
@@ -309,4 +312,39 @@ class TestIsIBMProvisioning(test_helpers.FilesystemMockingTestCase):
self.assertIn("no reference file", self.logs.getvalue())
+class TestDataSourceIBMCloud(test_helpers.CiTestCase):
+
+ def setUp(self):
+ super(TestDataSourceIBMCloud, self).setUp()
+ self.tmp = self.tmp_dir()
+ self.cloud_dir = self.tmp_path('cloud', dir=self.tmp)
+ util.ensure_dir(self.cloud_dir)
+ paths = Paths({'run_dir': self.tmp, 'cloud_dir': self.cloud_dir})
+ self.ds = ibm.DataSourceIBMCloud(
+ sys_cfg={}, distro=None, paths=paths)
+
+ def test_get_data_false(self):
+ """When read_md returns None, get_data returns False."""
+ with mock.patch(D_PATH + 'read_md', return_value=None):
+ self.assertFalse(self.ds.get_data())
+
+ def test_get_data_processes_read_md(self):
+ """get_data processes and caches content returned by read_md."""
+ md = {
+ 'metadata': {}, 'networkdata': 'net', 'platform': 'plat',
+ 'source': 'src', 'system-uuid': 'uuid', 'userdata': 'ud',
+ 'vendordata': 'vd'}
+ with mock.patch(D_PATH + 'read_md', return_value=md):
+ self.assertTrue(self.ds.get_data())
+ self.assertEqual('src', self.ds.source)
+ self.assertEqual('plat', self.ds.platform)
+ self.assertEqual({}, self.ds.metadata)
+ self.assertEqual('ud', self.ds.userdata_raw)
+ self.assertEqual('net', self.ds.network_json)
+ self.assertEqual('vd', self.ds.vendordata_pure)
+ self.assertEqual('uuid', self.ds.system_uuid)
+ self.assertEqual('ibmcloud', self.ds.cloud_name)
+ self.assertEqual('ibmcloud', self.ds.platform_type)
+ self.assertEqual('plat (src)', self.ds.subplatform)
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index 21931eb7..3429272c 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -1,7 +1,10 @@
# This file is part of cloud-init. See LICENSE file for license information.
from cloudinit import helpers
-from cloudinit.sources import DataSourceNoCloud
+from cloudinit.sources.DataSourceNoCloud import (
+ DataSourceNoCloud as dsNoCloud,
+ _maybe_remove_top_network,
+ parse_cmdline_data)
from cloudinit import util
from cloudinit.tests.helpers import CiTestCase, populate_dir, mock, ExitStack
@@ -10,6 +13,7 @@ import textwrap
import yaml
+@mock.patch('cloudinit.sources.DataSourceNoCloud.util.is_lxd')
class TestNoCloudDataSource(CiTestCase):
def setUp(self):
@@ -28,28 +32,46 @@ class TestNoCloudDataSource(CiTestCase):
self.mocks.enter_context(
mock.patch.object(util, 'read_dmi_data', return_value=None))
- def test_nocloud_seed_dir(self):
+ def test_nocloud_seed_dir_on_lxd(self, m_is_lxd):
md = {'instance-id': 'IID', 'dsmode': 'local'}
ud = b"USER_DATA_HERE"
- populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
+ seed_dir = os.path.join(self.paths.seed_dir, "nocloud")
+ populate_dir(seed_dir,
{'user-data': ud, 'meta-data': yaml.safe_dump(md)})
sys_cfg = {
'datasource': {'NoCloud': {'fs_label': None}}
}
- ds = DataSourceNoCloud.DataSourceNoCloud
-
- dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
self.assertEqual(dsrc.userdata_raw, ud)
self.assertEqual(dsrc.metadata, md)
+ self.assertEqual(dsrc.platform_type, 'lxd')
+ self.assertEqual(
+ dsrc.subplatform, 'seed-dir (%s)' % seed_dir)
self.assertTrue(ret)
- def test_fs_label(self):
- # find_devs_with should not be called ff fs_label is None
- ds = DataSourceNoCloud.DataSourceNoCloud
+ def test_nocloud_seed_dir_non_lxd_platform_is_nocloud(self, m_is_lxd):
+ """Non-lxd environments will list nocloud as the platform."""
+ m_is_lxd.return_value = False
+ md = {'instance-id': 'IID', 'dsmode': 'local'}
+ seed_dir = os.path.join(self.paths.seed_dir, "nocloud")
+ populate_dir(seed_dir,
+ {'user-data': '', 'meta-data': yaml.safe_dump(md)})
+
+ sys_cfg = {
+ 'datasource': {'NoCloud': {'fs_label': None}}
+ }
+
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ self.assertTrue(dsrc.get_data())
+ self.assertEqual(dsrc.platform_type, 'nocloud')
+ self.assertEqual(
+ dsrc.subplatform, 'seed-dir (%s)' % seed_dir)
+ def test_fs_label(self, m_is_lxd):
+ # find_devs_with should not be called ff fs_label is None
class PsuedoException(Exception):
pass
@@ -59,26 +81,23 @@ class TestNoCloudDataSource(CiTestCase):
# by default, NoCloud should search for filesystems by label
sys_cfg = {'datasource': {'NoCloud': {}}}
- dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
self.assertRaises(PsuedoException, dsrc.get_data)
# but disabling searching should just end up with None found
sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
- dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
self.assertFalse(ret)
- def test_no_datasource_expected(self):
+ def test_no_datasource_expected(self, m_is_lxd):
# no source should be found if no cmdline, config, and fs_label=None
sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
- ds = DataSourceNoCloud.DataSourceNoCloud
- dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
self.assertFalse(dsrc.get_data())
- def test_seed_in_config(self):
- ds = DataSourceNoCloud.DataSourceNoCloud
-
+ def test_seed_in_config(self, m_is_lxd):
data = {
'fs_label': None,
'meta-data': yaml.safe_dump({'instance-id': 'IID'}),
@@ -86,13 +105,13 @@ class TestNoCloudDataSource(CiTestCase):
}
sys_cfg = {'datasource': {'NoCloud': data}}
- dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
self.assertEqual(dsrc.userdata_raw, b"USER_DATA_RAW")
self.assertEqual(dsrc.metadata.get('instance-id'), 'IID')
self.assertTrue(ret)
- def test_nocloud_seed_with_vendordata(self):
+ def test_nocloud_seed_with_vendordata(self, m_is_lxd):
md = {'instance-id': 'IID', 'dsmode': 'local'}
ud = b"USER_DATA_HERE"
vd = b"THIS IS MY VENDOR_DATA"
@@ -105,30 +124,26 @@ class TestNoCloudDataSource(CiTestCase):
'datasource': {'NoCloud': {'fs_label': None}}
}
- ds = DataSourceNoCloud.DataSourceNoCloud
-
- dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
self.assertEqual(dsrc.userdata_raw, ud)
self.assertEqual(dsrc.metadata, md)
self.assertEqual(dsrc.vendordata_raw, vd)
self.assertTrue(ret)
- def test_nocloud_no_vendordata(self):
+ def test_nocloud_no_vendordata(self, m_is_lxd):
populate_dir(os.path.join(self.paths.seed_dir, "nocloud"),
{'user-data': b"ud", 'meta-data': "instance-id: IID\n"})
sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
- ds = DataSourceNoCloud.DataSourceNoCloud
-
- dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
self.assertEqual(dsrc.userdata_raw, b"ud")
self.assertFalse(dsrc.vendordata)
self.assertTrue(ret)
- def test_metadata_network_interfaces(self):
+ def test_metadata_network_interfaces(self, m_is_lxd):
gateway = "103.225.10.1"
md = {
'instance-id': 'i-abcd',
@@ -149,15 +164,13 @@ class TestNoCloudDataSource(CiTestCase):
sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
- ds = DataSourceNoCloud.DataSourceNoCloud
-
- dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
self.assertTrue(ret)
# very simple check just for the strings above
self.assertIn(gateway, str(dsrc.network_config))
- def test_metadata_network_config(self):
+ def test_metadata_network_config(self, m_is_lxd):
# network-config needs to get into network_config
netconf = {'version': 1,
'config': [{'type': 'physical', 'name': 'interface0',
@@ -170,14 +183,28 @@ class TestNoCloudDataSource(CiTestCase):
sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
- ds = DataSourceNoCloud.DataSourceNoCloud
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ ret = dsrc.get_data()
+ self.assertTrue(ret)
+ self.assertEqual(netconf, dsrc.network_config)
- dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ def test_metadata_network_config_with_toplevel_network(self, m_is_lxd):
+ """network-config may have 'network' top level key."""
+ netconf = {'config': 'disabled'}
+ populate_dir(
+ os.path.join(self.paths.seed_dir, "nocloud"),
+ {'user-data': b"ud",
+ 'meta-data': "instance-id: IID\n",
+ 'network-config': yaml.dump({'network': netconf}) + "\n"})
+
+ sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
+
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(netconf, dsrc.network_config)
- def test_metadata_network_config_over_interfaces(self):
+ def test_metadata_network_config_over_interfaces(self, m_is_lxd):
# network-config should override meta-data/network-interfaces
gateway = "103.225.10.1"
md = {
@@ -203,9 +230,7 @@ class TestNoCloudDataSource(CiTestCase):
sys_cfg = {'datasource': {'NoCloud': {'fs_label': None}}}
- ds = DataSourceNoCloud.DataSourceNoCloud
-
- dsrc = ds(sys_cfg=sys_cfg, distro=None, paths=self.paths)
+ dsrc = dsNoCloud(sys_cfg=sys_cfg, distro=None, paths=self.paths)
ret = dsrc.get_data()
self.assertTrue(ret)
self.assertEqual(netconf, dsrc.network_config)
@@ -233,8 +258,7 @@ class TestParseCommandLineData(CiTestCase):
for (fmt, expected) in pairs:
fill = {}
cmdline = fmt % {'ds_id': ds_id}
- ret = DataSourceNoCloud.parse_cmdline_data(ds_id=ds_id, fill=fill,
- cmdline=cmdline)
+ ret = parse_cmdline_data(ds_id=ds_id, fill=fill, cmdline=cmdline)
self.assertEqual(expected, fill)
self.assertTrue(ret)
@@ -251,10 +275,43 @@ class TestParseCommandLineData(CiTestCase):
for cmdline in cmdlines:
fill = {}
- ret = DataSourceNoCloud.parse_cmdline_data(ds_id=ds_id, fill=fill,
- cmdline=cmdline)
+ ret = parse_cmdline_data(ds_id=ds_id, fill=fill, cmdline=cmdline)
self.assertEqual(fill, {})
self.assertFalse(ret)
+class TestMaybeRemoveToplevelNetwork(CiTestCase):
+ """test _maybe_remove_top_network function."""
+ basecfg = [{'type': 'physical', 'name': 'interface0',
+ 'subnets': [{'type': 'dhcp'}]}]
+
+ def test_should_remove_safely(self):
+ mcfg = {'config': self.basecfg, 'version': 1}
+ self.assertEqual(mcfg, _maybe_remove_top_network({'network': mcfg}))
+
+ def test_no_remove_if_other_keys(self):
+ """should not shift if other keys at top level."""
+ mcfg = {'network': {'config': self.basecfg, 'version': 1},
+ 'unknown_keyname': 'keyval'}
+ self.assertEqual(mcfg, _maybe_remove_top_network(mcfg))
+
+ def test_no_remove_if_non_dict(self):
+ """should not shift if not a dict."""
+ mcfg = {'network': '"content here'}
+ self.assertEqual(mcfg, _maybe_remove_top_network(mcfg))
+
+ def test_no_remove_if_missing_config_or_version(self):
+ """should not shift unless network entry has config and version."""
+ mcfg = {'network': {'config': self.basecfg}}
+ self.assertEqual(mcfg, _maybe_remove_top_network(mcfg))
+
+ mcfg = {'network': {'version': 1}}
+ self.assertEqual(mcfg, _maybe_remove_top_network(mcfg))
+
+ def test_remove_with_config_disabled(self):
+ """network/config=disabled should be shifted."""
+ mcfg = {'config': 'disabled'}
+ self.assertEqual(mcfg, _maybe_remove_top_network({'network': mcfg}))
+
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_datasource/test_opennebula.py b/tests/unittests/test_datasource/test_opennebula.py
index 61591017..bb399f6d 100644
--- a/tests/unittests/test_datasource/test_opennebula.py
+++ b/tests/unittests/test_datasource/test_opennebula.py
@@ -123,6 +123,10 @@ class TestOpenNebulaDataSource(CiTestCase):
self.assertTrue(ret)
finally:
util.find_devs_with = orig_find_devs_with
+ self.assertEqual('opennebula', dsrc.cloud_name)
+ self.assertEqual('opennebula', dsrc.platform_type)
+ self.assertEqual(
+ 'seed-dir (%s/seed/opennebula)' % self.tmp, dsrc.subplatform)
def test_seed_dir_non_contextdisk(self):
self.assertRaises(ds.NonContextDiskDir, ds.read_context_disk_dir,
diff --git a/tests/unittests/test_datasource/test_ovf.py b/tests/unittests/test_datasource/test_ovf.py
index 9d52eb99..a226c032 100644
--- a/tests/unittests/test_datasource/test_ovf.py
+++ b/tests/unittests/test_datasource/test_ovf.py
@@ -11,7 +11,7 @@ from collections import OrderedDict
from textwrap import dedent
from cloudinit import util
-from cloudinit.tests.helpers import CiTestCase, wrap_and_call
+from cloudinit.tests.helpers import CiTestCase, mock, wrap_and_call
from cloudinit.helpers import Paths
from cloudinit.sources import DataSourceOVF as dsovf
from cloudinit.sources.helpers.vmware.imc.config_custom_script import (
@@ -120,7 +120,7 @@ class TestDatasourceOVF(CiTestCase):
def test_get_data_false_on_none_dmi_data(self):
"""When dmi for system-product-name is None, get_data returns False."""
- paths = Paths({'seed_dir': self.tdir})
+ paths = Paths({'cloud_dir': self.tdir})
ds = self.datasource(sys_cfg={}, distro={}, paths=paths)
retcode = wrap_and_call(
'cloudinit.sources.DataSourceOVF',
@@ -134,7 +134,7 @@ class TestDatasourceOVF(CiTestCase):
def test_get_data_no_vmware_customization_disabled(self):
"""When vmware customization is disabled via sys_cfg log a message."""
- paths = Paths({'seed_dir': self.tdir})
+ paths = Paths({'cloud_dir': self.tdir})
ds = self.datasource(
sys_cfg={'disable_vmware_customization': True}, distro={},
paths=paths)
@@ -153,7 +153,7 @@ class TestDatasourceOVF(CiTestCase):
"""When cloud-init workflow for vmware is enabled via sys_cfg log a
message.
"""
- paths = Paths({'seed_dir': self.tdir})
+ paths = Paths({'cloud_dir': self.tdir})
ds = self.datasource(
sys_cfg={'disable_vmware_customization': False}, distro={},
paths=paths)
@@ -178,6 +178,50 @@ class TestDatasourceOVF(CiTestCase):
self.assertIn('Script %s not found!!' % customscript,
str(context.exception))
+ def test_get_data_non_vmware_seed_platform_info(self):
+ """Platform info properly reports when on non-vmware platforms."""
+ paths = Paths({'cloud_dir': self.tdir, 'run_dir': self.tdir})
+ # Write ovf-env.xml seed file
+ seed_dir = self.tmp_path('seed', dir=self.tdir)
+ ovf_env = self.tmp_path('ovf-env.xml', dir=seed_dir)
+ util.write_file(ovf_env, OVF_ENV_CONTENT)
+ ds = self.datasource(sys_cfg={}, distro={}, paths=paths)
+
+ self.assertEqual('ovf', ds.cloud_name)
+ self.assertEqual('ovf', ds.platform_type)
+ MPATH = 'cloudinit.sources.DataSourceOVF.'
+ with mock.patch(MPATH + 'util.read_dmi_data', return_value='!VMware'):
+ with mock.patch(MPATH + 'transport_vmware_guestd') as m_guestd:
+ with mock.patch(MPATH + 'transport_iso9660') as m_iso9660:
+ m_iso9660.return_value = (None, 'ignored', 'ignored')
+ m_guestd.return_value = (None, 'ignored', 'ignored')
+ self.assertTrue(ds.get_data())
+ self.assertEqual(
+ 'ovf (%s/seed/ovf-env.xml)' % self.tdir,
+ ds.subplatform)
+
+ def test_get_data_vmware_seed_platform_info(self):
+ """Platform info properly reports when on VMware platform."""
+ paths = Paths({'cloud_dir': self.tdir, 'run_dir': self.tdir})
+ # Write ovf-env.xml seed file
+ seed_dir = self.tmp_path('seed', dir=self.tdir)
+ ovf_env = self.tmp_path('ovf-env.xml', dir=seed_dir)
+ util.write_file(ovf_env, OVF_ENV_CONTENT)
+ ds = self.datasource(sys_cfg={}, distro={}, paths=paths)
+
+ self.assertEqual('ovf', ds.cloud_name)
+ self.assertEqual('ovf', ds.platform_type)
+ MPATH = 'cloudinit.sources.DataSourceOVF.'
+ with mock.patch(MPATH + 'util.read_dmi_data', return_value='VMWare'):
+ with mock.patch(MPATH + 'transport_vmware_guestd') as m_guestd:
+ with mock.patch(MPATH + 'transport_iso9660') as m_iso9660:
+ m_iso9660.return_value = (None, 'ignored', 'ignored')
+ m_guestd.return_value = (None, 'ignored', 'ignored')
+ self.assertTrue(ds.get_data())
+ self.assertEqual(
+ 'vmware (%s/seed/ovf-env.xml)' % self.tdir,
+ ds.subplatform)
+
class TestTransportIso9660(CiTestCase):
diff --git a/tests/unittests/test_datasource/test_smartos.py b/tests/unittests/test_datasource/test_smartos.py
index 46d67b94..42ac6971 100644
--- a/tests/unittests/test_datasource/test_smartos.py
+++ b/tests/unittests/test_datasource/test_smartos.py
@@ -426,6 +426,13 @@ class TestSmartOSDataSource(FilesystemMockingTestCase):
self.assertEqual(MOCK_RETURNS['sdc:uuid'],
dsrc.metadata['instance-id'])
+ def test_platform_info(self):
+ """All platform-related attributes are properly set."""
+ dsrc = self._get_ds(mockdata=MOCK_RETURNS)
+ self.assertEqual('joyent', dsrc.cloud_name)
+ self.assertEqual('joyent', dsrc.platform_type)
+ self.assertEqual('serial (/dev/ttyS1)', dsrc.subplatform)
+
def test_root_keys(self):
dsrc = self._get_ds(mockdata=MOCK_RETURNS)
ret = dsrc.get_data()
diff --git a/tests/unittests/test_ds_identify.py b/tests/unittests/test_ds_identify.py
index 46778e95..80640f19 100644
--- a/tests/unittests/test_ds_identify.py
+++ b/tests/unittests/test_ds_identify.py
@@ -499,7 +499,7 @@ class TestDsIdentify(DsIdentifyBase):
# Add recognized labels
valid_ovf_labels = ['ovf-transport', 'OVF-TRANSPORT',
- "OVFENV", "ovfenv"]
+ "OVFENV", "ovfenv", "OVF ENV", "ovf env"]
for valid_ovf_label in valid_ovf_labels:
ovf_cdrom_by_label['mocks'][0]['out'] = blkid_out([
{'DEVNAME': 'sda1', 'TYPE': 'ext4', 'LABEL': 'rootfs'},
diff --git a/tests/unittests/test_handler/test_handler_resizefs.py b/tests/unittests/test_handler/test_handler_resizefs.py
index feca56c2..35187847 100644
--- a/tests/unittests/test_handler/test_handler_resizefs.py
+++ b/tests/unittests/test_handler/test_handler_resizefs.py
@@ -151,9 +151,9 @@ class TestResizefs(CiTestCase):
_resize_ufs(mount_point, devpth))
@mock.patch('cloudinit.util.is_container', return_value=False)
- @mock.patch('cloudinit.util.get_mount_info')
- @mock.patch('cloudinit.util.get_device_info_from_zpool')
@mock.patch('cloudinit.util.parse_mount')
+ @mock.patch('cloudinit.util.get_device_info_from_zpool')
+ @mock.patch('cloudinit.util.get_mount_info')
def test_handle_zfs_root(self, mount_info, zpool_info, parse_mount,
is_container):
devpth = 'vmzroot/ROOT/freebsd'
@@ -173,6 +173,38 @@ class TestResizefs(CiTestCase):
self.assertEqual(('zpool', 'online', '-e', 'vmzroot', disk), ret)
+ @mock.patch('cloudinit.util.is_container', return_value=False)
+ @mock.patch('cloudinit.util.get_mount_info')
+ @mock.patch('cloudinit.util.get_device_info_from_zpool')
+ @mock.patch('cloudinit.util.parse_mount')
+ def test_handle_modern_zfsroot(self, mount_info, zpool_info, parse_mount,
+ is_container):
+ devpth = 'zroot/ROOT/default'
+ disk = 'da0p3'
+ fs_type = 'zfs'
+ mount_point = '/'
+
+ mount_info.return_value = (devpth, fs_type, mount_point)
+ zpool_info.return_value = disk
+ parse_mount.return_value = (devpth, fs_type, mount_point)
+
+ cfg = {'resize_rootfs': True}
+
+ def fake_stat(devpath):
+ if devpath == disk:
+ raise OSError("not here")
+ FakeStat = namedtuple(
+ 'FakeStat', ['st_mode', 'st_size', 'st_mtime']) # minimal stat
+ return FakeStat(25008, 0, 1) # fake char block device
+
+ with mock.patch('cloudinit.config.cc_resizefs.do_resize') as dresize:
+ with mock.patch('cloudinit.config.cc_resizefs.os.stat') as m_stat:
+ m_stat.side_effect = fake_stat
+ handle('cc_resizefs', cfg, _cloud=None, log=LOG, args=[])
+
+ self.assertEqual(('zpool', 'online', '-e', 'zroot', '/dev/' + disk),
+ dresize.call_args[0][0])
+
class TestRootDevFromCmdline(CiTestCase):
@@ -246,39 +278,39 @@ class TestMaybeGetDevicePathAsWritableBlock(CiTestCase):
def test_maybe_get_writable_device_path_does_not_exist(self):
"""When devpath does not exist, a warning is logged."""
- info = 'dev=/I/dont/exist mnt_point=/ path=/dev/none'
+ info = 'dev=/dev/I/dont/exist mnt_point=/ path=/dev/none'
devpath = wrap_and_call(
'cloudinit.config.cc_resizefs.util',
{'is_container': {'return_value': False}},
- maybe_get_writable_device_path, '/I/dont/exist', info, LOG)
+ maybe_get_writable_device_path, '/dev/I/dont/exist', info, LOG)
self.assertIsNone(devpath)
self.assertIn(
- "WARNING: Device '/I/dont/exist' did not exist."
+ "WARNING: Device '/dev/I/dont/exist' did not exist."
' cannot resize: %s' % info,
self.logs.getvalue())
def test_maybe_get_writable_device_path_does_not_exist_in_container(self):
"""When devpath does not exist in a container, log a debug message."""
- info = 'dev=/I/dont/exist mnt_point=/ path=/dev/none'
+ info = 'dev=/dev/I/dont/exist mnt_point=/ path=/dev/none'
devpath = wrap_and_call(
'cloudinit.config.cc_resizefs.util',
{'is_container': {'return_value': True}},
- maybe_get_writable_device_path, '/I/dont/exist', info, LOG)
+ maybe_get_writable_device_path, '/dev/I/dont/exist', info, LOG)
self.assertIsNone(devpath)
self.assertIn(
- "DEBUG: Device '/I/dont/exist' did not exist in container."
+ "DEBUG: Device '/dev/I/dont/exist' did not exist in container."
' cannot resize: %s' % info,
self.logs.getvalue())
def test_maybe_get_writable_device_path_raises_oserror(self):
"""When unexpected OSError is raises by os.stat it is reraised."""
- info = 'dev=/I/dont/exist mnt_point=/ path=/dev/none'
+ info = 'dev=/dev/I/dont/exist mnt_point=/ path=/dev/none'
with self.assertRaises(OSError) as context_manager:
wrap_and_call(
'cloudinit.config.cc_resizefs',
{'util.is_container': {'return_value': True},
'os.stat': {'side_effect': OSError('Something unexpected')}},
- maybe_get_writable_device_path, '/I/dont/exist', info, LOG)
+ maybe_get_writable_device_path, '/dev/I/dont/exist', info, LOG)
self.assertEqual(
'Something unexpected', str(context_manager.exception))
diff --git a/tests/unittests/test_handler/test_handler_write_files.py b/tests/unittests/test_handler/test_handler_write_files.py
index 7fa8fd21..bc8756ca 100644
--- a/tests/unittests/test_handler/test_handler_write_files.py
+++ b/tests/unittests/test_handler/test_handler_write_files.py
@@ -52,6 +52,18 @@ class TestWriteFiles(FilesystemMockingTestCase):
"test_simple", [{"content": expected, "path": filename}])
self.assertEqual(util.load_file(filename), expected)
+ def test_append(self):
+ self.patchUtils(self.tmp)
+ existing = "hello "
+ added = "world\n"
+ expected = existing + added
+ filename = "/tmp/append.file"
+ util.write_file(filename, existing)
+ write_files(
+ "test_append",
+ [{"content": added, "path": filename, "append": "true"}])
+ self.assertEqual(util.load_file(filename), expected)
+
def test_yaml_binary(self):
self.patchUtils(self.tmp)
data = util.load_yaml(YAML_TEXT)
diff --git a/tests/unittests/test_net.py b/tests/unittests/test_net.py
index 5d9c7d92..195f261c 100644
--- a/tests/unittests/test_net.py
+++ b/tests/unittests/test_net.py
@@ -488,8 +488,8 @@ NETWORK_CONFIGS = {
address 192.168.21.3/24
dns-nameservers 8.8.8.8 8.8.4.4
dns-search barley.maas sach.maas
- post-up route add default gw 65.61.151.37 || true
- pre-down route del default gw 65.61.151.37 || true
+ post-up route add default gw 65.61.151.37 metric 10000 || true
+ pre-down route del default gw 65.61.151.37 metric 10000 || true
""").rstrip(' '),
'expected_netplan': textwrap.dedent("""
network:
@@ -513,7 +513,8 @@ NETWORK_CONFIGS = {
- barley.maas
- sach.maas
routes:
- - to: 0.0.0.0/0
+ - metric: 10000
+ to: 0.0.0.0/0
via: 65.61.151.37
set-name: eth99
""").rstrip(' '),
@@ -537,6 +538,7 @@ NETWORK_CONFIGS = {
HWADDR=c0:d6:9f:2c:e8:80
IPADDR=192.168.21.3
NETMASK=255.255.255.0
+ METRIC=10000
NM_CONTROLLED=no
ONBOOT=yes
TYPE=Ethernet
@@ -561,7 +563,7 @@ NETWORK_CONFIGS = {
- gateway: 65.61.151.37
netmask: 0.0.0.0
network: 0.0.0.0
- metric: 2
+ metric: 10000
- type: physical
name: eth1
mac_address: "cf:d6:af:48:e8:80"
@@ -1161,6 +1163,13 @@ pre-down route del -net 10.0.0.0 netmask 255.0.0.0 gw 11.0.0.1 metric 3 || true
- gateway: 192.168.0.3
netmask: 255.255.255.0
network: 10.1.3.0
+ - gateway: 2001:67c:1562:1
+ network: 2001:67c:1
+ netmask: ffff:ffff:0
+ - gateway: 3001:67c:1562:1
+ network: 3001:67c:1
+ netmask: ffff:ffff:0
+ metric: 10000
- type: static
address: 192.168.1.2/24
- type: static
@@ -1197,6 +1206,11 @@ pre-down route del -net 10.0.0.0 netmask 255.0.0.0 gw 11.0.0.1 metric 3 || true
routes:
- to: 10.1.3.0/24
via: 192.168.0.3
+ - to: 2001:67c:1/32
+ via: 2001:67c:1562:1
+ - metric: 10000
+ to: 3001:67c:1/32
+ via: 3001:67c:1562:1
"""),
'yaml-v2': textwrap.dedent("""
version: 2
@@ -1228,6 +1242,11 @@ pre-down route del -net 10.0.0.0 netmask 255.0.0.0 gw 11.0.0.1 metric 3 || true
routes:
- to: 10.1.3.0/24
via: 192.168.0.3
+ - to: 2001:67c:1562:8007::1/64
+ via: 2001:67c:1562:8007::aac:40b2
+ - metric: 10000
+ to: 3001:67c:1562:8007::1/64
+ via: 3001:67c:1562:8007::aac:40b2
"""),
'expected_netplan-v2': textwrap.dedent("""
network:
@@ -1249,6 +1268,11 @@ pre-down route del -net 10.0.0.0 netmask 255.0.0.0 gw 11.0.0.1 metric 3 || true
routes:
- to: 10.1.3.0/24
via: 192.168.0.3
+ - to: 2001:67c:1562:8007::1/64
+ via: 2001:67c:1562:8007::aac:40b2
+ - metric: 10000
+ to: 3001:67c:1562:8007::1/64
+ via: 3001:67c:1562:8007::aac:40b2
ethernets:
eth0:
match:
@@ -1349,6 +1373,10 @@ pre-down route del -net 10.0.0.0 netmask 255.0.0.0 gw 11.0.0.1 metric 3 || true
USERCTL=no
"""),
'route6-bond0': textwrap.dedent("""\
+ # Created by cloud-init on instance boot automatically, do not edit.
+ #
+ 2001:67c:1/ffff:ffff:0 via 2001:67c:1562:1 dev bond0
+ 3001:67c:1/ffff:ffff:0 via 3001:67c:1562:1 metric 10000 dev bond0
"""),
'route-bond0': textwrap.dedent("""\
ADDRESS0=10.1.3.0
@@ -1879,14 +1907,24 @@ class TestRhelSysConfigRendering(CiTestCase):
return dir2dict(dir)
def _compare_files_to_expected(self, expected, found):
+
+ def _try_load(f):
+ ''' Attempt to load shell content, otherwise return as-is '''
+ try:
+ return util.load_shell_content(f)
+ except ValueError:
+ pass
+ # route6- * files aren't shell content, but iproute2 params
+ return f
+
orig_maxdiff = self.maxDiff
expected_d = dict(
- (os.path.join(self.scripts_dir, k), util.load_shell_content(v))
+ (os.path.join(self.scripts_dir, k), _try_load(v))
for k, v in expected.items())
# only compare the files in scripts_dir
scripts_found = dict(
- (k, util.load_shell_content(v)) for k, v in found.items()
+ (k, _try_load(v)) for k, v in found.items()
if k.startswith(self.scripts_dir))
try:
self.maxDiff = None
@@ -3339,9 +3377,23 @@ class TestGetInterfacesByMac(CiTestCase):
addnics = ('greptap1', 'lo', 'greptap2')
self.data['macs'].update(dict((k, empty_mac) for k in addnics))
self.data['devices'].update(set(addnics))
+ self.data['own_macs'].extend(list(addnics))
ret = net.get_interfaces_by_mac()
self.assertEqual('lo', ret[empty_mac])
+ def test_skip_all_zeros(self):
+ """Any mac of 00:... should be skipped."""
+ self._mock_setup()
+ emac1, emac2, emac4, emac6 = (
+ '00', '00:00', '00:00:00:00', '00:00:00:00:00:00')
+ addnics = {'empty1': emac1, 'emac2a': emac2, 'emac2b': emac2,
+ 'emac4': emac4, 'emac6': emac6}
+ self.data['macs'].update(addnics)
+ self.data['devices'].update(set(addnics))
+ self.data['own_macs'].extend(addnics.keys())
+ ret = net.get_interfaces_by_mac()
+ self.assertEqual('lo', ret['00:00:00:00:00:00'])
+
def test_ib(self):
ib_addr = '80:00:00:28:fe:80:00:00:00:00:00:00:00:11:22:03:00:33:44:56'
ib_addr_eth_format = '00:11:22:33:44:56'
diff --git a/tests/unittests/test_vmware_config_file.py b/tests/unittests/test_vmware_config_file.py
index 602dedb0..f47335ea 100644
--- a/tests/unittests/test_vmware_config_file.py
+++ b/tests/unittests/test_vmware_config_file.py
@@ -263,7 +263,7 @@ class TestVmwareConfigFile(CiTestCase):
nicConfigurator = NicConfigurator(config.nics, False)
nics_cfg_list = nicConfigurator.generate()
- self.assertEqual(5, len(nics_cfg_list), "number of elements")
+ self.assertEqual(2, len(nics_cfg_list), "number of elements")
nic1 = {'name': 'NIC1'}
nic2 = {'name': 'NIC2'}
@@ -275,8 +275,6 @@ class TestVmwareConfigFile(CiTestCase):
nic1.update(cfg)
elif cfg.get('name') == nic2.get('name'):
nic2.update(cfg)
- elif cfg_type == 'route':
- route_list.append(cfg)
self.assertEqual('physical', nic1.get('type'), 'type of NIC1')
self.assertEqual('NIC1', nic1.get('name'), 'name of NIC1')
@@ -297,6 +295,9 @@ class TestVmwareConfigFile(CiTestCase):
static6_subnet.append(subnet)
else:
self.assertEqual(True, False, 'Unknown type')
+ if 'route' in subnet:
+ for route in subnet.get('routes'):
+ route_list.append(route)
self.assertEqual(1, len(static_subnet), 'Number of static subnet')
self.assertEqual(1, len(static6_subnet), 'Number of static6 subnet')
@@ -351,6 +352,8 @@ class TestVmwareConfigFile(CiTestCase):
class TestVmwareNetConfig(CiTestCase):
"""Test conversion of vmware config to cloud-init config."""
+ maxDiff = None
+
def _get_NicConfigurator(self, text):
fp = None
try:
@@ -420,9 +423,52 @@ class TestVmwareNetConfig(CiTestCase):
'mac_address': '00:50:56:a6:8c:08',
'subnets': [
{'control': 'auto', 'type': 'static',
- 'address': '10.20.87.154', 'netmask': '255.255.252.0'}]},
- {'type': 'route', 'destination': '10.20.84.0/22',
- 'gateway': '10.20.87.253', 'metric': 10000}],
+ 'address': '10.20.87.154', 'netmask': '255.255.252.0',
+ 'routes':
+ [{'type': 'route', 'destination': '10.20.84.0/22',
+ 'gateway': '10.20.87.253', 'metric': 10000}]}]}],
+ nc.generate())
+
+ def test_cust_non_primary_nic_with_gateway_(self):
+ """A customer non primary nic set can have a gateway."""
+ config = textwrap.dedent("""\
+ [NETWORK]
+ NETWORKING = yes
+ BOOTPROTO = dhcp
+ HOSTNAME = static-debug-vm
+ DOMAINNAME = cluster.local
+
+ [NIC-CONFIG]
+ NICS = NIC1
+
+ [NIC1]
+ MACADDR = 00:50:56:ac:d1:8a
+ ONBOOT = yes
+ IPv4_MODE = BACKWARDS_COMPATIBLE
+ BOOTPROTO = static
+ IPADDR = 100.115.223.75
+ NETMASK = 255.255.255.0
+ GATEWAY = 100.115.223.254
+
+
+ [DNS]
+ DNSFROMDHCP=no
+
+ NAMESERVER|1 = 8.8.8.8
+
+ [DATETIME]
+ UTC = yes
+ """)
+ nc = self._get_NicConfigurator(config)
+ self.assertEqual(
+ [{'type': 'physical', 'name': 'NIC1',
+ 'mac_address': '00:50:56:ac:d1:8a',
+ 'subnets': [
+ {'control': 'auto', 'type': 'static',
+ 'address': '100.115.223.75', 'netmask': '255.255.255.0',
+ 'routes':
+ [{'type': 'route', 'destination': '100.115.223.0/24',
+ 'gateway': '100.115.223.254', 'metric': 10000}]}]}],
nc.generate())
def test_a_primary_nic_with_gateway(self):