summaryrefslogtreecommitdiff
path: root/tests/unittests
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittests')
-rw-r--r--tests/unittests/test_data.py50
-rw-r--r--tests/unittests/test_datasource/test_ec2.py33
-rw-r--r--tests/unittests/test_handler/test_handler_etc_hosts.py69
-rw-r--r--tests/unittests/test_handler/test_handler_ntp.py26
-rw-r--r--tests/unittests/test_rh_subscription.py15
5 files changed, 193 insertions, 0 deletions
diff --git a/tests/unittests/test_data.py b/tests/unittests/test_data.py
index 6d621d26..275b16d2 100644
--- a/tests/unittests/test_data.py
+++ b/tests/unittests/test_data.py
@@ -18,6 +18,8 @@ from email.mime.application import MIMEApplication
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
+import httpretty
+
from cloudinit import handlers
from cloudinit import helpers as c_helpers
from cloudinit import log
@@ -522,6 +524,54 @@ c: 4
self.assertEqual(cfg.get('password'), 'gocubs')
self.assertEqual(cfg.get('locale'), 'chicago')
+ @httpretty.activate
+ @mock.patch('cloudinit.url_helper.time.sleep')
+ def test_include(self, mock_sleep):
+ """Test #include."""
+ included_url = 'http://hostname/path'
+ included_data = '#cloud-config\nincluded: true\n'
+ httpretty.register_uri(httpretty.GET, included_url, included_data)
+
+ blob = '#include\n%s\n' % included_url
+
+ self.reRoot()
+ ci = stages.Init()
+ ci.datasource = FakeDataSource(blob)
+ ci.fetch()
+ ci.consume_data()
+ cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
+ cc = util.load_yaml(cc_contents)
+ self.assertTrue(cc.get('included'))
+
+ @httpretty.activate
+ @mock.patch('cloudinit.url_helper.time.sleep')
+ def test_include_bad_url(self, mock_sleep):
+ """Test #include with a bad URL."""
+ bad_url = 'http://bad/forbidden'
+ bad_data = '#cloud-config\nbad: true\n'
+ httpretty.register_uri(httpretty.GET, bad_url, bad_data, status=403)
+
+ included_url = 'http://hostname/path'
+ included_data = '#cloud-config\nincluded: true\n'
+ httpretty.register_uri(httpretty.GET, included_url, included_data)
+
+ blob = '#include\n%s\n%s' % (bad_url, included_url)
+
+ self.reRoot()
+ ci = stages.Init()
+ ci.datasource = FakeDataSource(blob)
+ log_file = self.capture_log(logging.WARNING)
+ ci.fetch()
+ ci.consume_data()
+
+ self.assertIn("403 Client Error: Forbidden for url: %s" % bad_url,
+ log_file.getvalue())
+
+ cc_contents = util.load_file(ci.paths.get_ipath("cloud_config"))
+ cc = util.load_yaml(cc_contents)
+ self.assertIsNone(cc.get('bad'))
+ self.assertTrue(cc.get('included'))
+
class TestUDProcess(helpers.ResourceUsingTestCase):
diff --git a/tests/unittests/test_datasource/test_ec2.py b/tests/unittests/test_datasource/test_ec2.py
index 6af699a6..ba328ee9 100644
--- a/tests/unittests/test_datasource/test_ec2.py
+++ b/tests/unittests/test_datasource/test_ec2.py
@@ -307,6 +307,39 @@ class TestEc2(test_helpers.HttprettyTestCase):
@httpretty.activate
@mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery')
+ def test_network_config_cached_property_refreshed_on_upgrade(self, m_dhcp):
+ """Refresh the network_config Ec2 cache if network key is absent.
+
+ This catches an upgrade issue where obj.pkl contained stale metadata
+ which lacked newly required network key.
+ """
+ old_metadata = copy.deepcopy(DEFAULT_METADATA)
+ old_metadata.pop('network')
+ ds = self._setup_ds(
+ platform_data=self.valid_platform_data,
+ sys_cfg={'datasource': {'Ec2': {'strict_id': True}}},
+ md=old_metadata)
+ self.assertTrue(ds.get_data())
+ # Provide new revision of metadata that contains network data
+ register_mock_metaserver(
+ 'http://169.254.169.254/2009-04-04/meta-data/', DEFAULT_METADATA)
+ mac1 = '06:17:04:d7:26:09' # Defined in DEFAULT_METADATA
+ get_interface_mac_path = (
+ 'cloudinit.sources.DataSourceEc2.net.get_interface_mac')
+ ds.fallback_nic = 'eth9'
+ with mock.patch(get_interface_mac_path) as m_get_interface_mac:
+ m_get_interface_mac.return_value = mac1
+ ds.network_config # Will re-crawl network metadata
+ self.assertIn('Re-crawl of metadata service', self.logs.getvalue())
+ expected = {'version': 1, 'config': [
+ {'mac_address': '06:17:04:d7:26:09',
+ 'name': 'eth9',
+ 'subnets': [{'type': 'dhcp4'}, {'type': 'dhcp6'}],
+ 'type': 'physical'}]}
+ self.assertEqual(expected, ds.network_config)
+
+ @httpretty.activate
+ @mock.patch('cloudinit.net.dhcp.maybe_perform_dhcp_discovery')
def test_valid_platform_with_strict_true(self, m_dhcp):
"""Valid platform data should return true with strict_id true."""
ds = self._setup_ds(
diff --git a/tests/unittests/test_handler/test_handler_etc_hosts.py b/tests/unittests/test_handler/test_handler_etc_hosts.py
new file mode 100644
index 00000000..ced05a8d
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_etc_hosts.py
@@ -0,0 +1,69 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+from cloudinit.config import cc_update_etc_hosts
+
+from cloudinit import cloud
+from cloudinit import distros
+from cloudinit import helpers
+from cloudinit import util
+
+from cloudinit.tests import helpers as t_help
+
+import logging
+import os
+import shutil
+
+LOG = logging.getLogger(__name__)
+
+
+class TestHostsFile(t_help.FilesystemMockingTestCase):
+ def setUp(self):
+ super(TestHostsFile, self).setUp()
+ self.tmp = self.tmp_dir()
+
+ def _fetch_distro(self, kind):
+ cls = distros.fetch(kind)
+ paths = helpers.Paths({})
+ return cls(kind, {}, paths)
+
+ def test_write_etc_hosts_suse_localhost(self):
+ cfg = {
+ 'manage_etc_hosts': 'localhost',
+ 'hostname': 'cloud-init.test.us'
+ }
+ os.makedirs('%s/etc/' % self.tmp)
+ hosts_content = '192.168.1.1 blah.blah.us blah\n'
+ fout = open('%s/etc/hosts' % self.tmp, 'w')
+ fout.write(hosts_content)
+ fout.close()
+ distro = self._fetch_distro('sles')
+ distro.hosts_fn = '%s/etc/hosts' % self.tmp
+ paths = helpers.Paths({})
+ ds = None
+ cc = cloud.Cloud(ds, paths, {}, distro, None)
+ self.patchUtils(self.tmp)
+ cc_update_etc_hosts.handle('test', cfg, cc, LOG, [])
+ contents = util.load_file('%s/etc/hosts' % self.tmp)
+ if '127.0.0.1\tcloud-init.test.us\tcloud-init' not in contents:
+ self.assertIsNone('No entry for 127.0.0.1 in etc/hosts')
+ if '192.168.1.1\tblah.blah.us\tblah' not in contents:
+ self.assertIsNone('Default etc/hosts content modified')
+
+ def test_write_etc_hosts_suse_template(self):
+ cfg = {
+ 'manage_etc_hosts': 'template',
+ 'hostname': 'cloud-init.test.us'
+ }
+ shutil.copytree('templates', '%s/etc/cloud/templates' % self.tmp)
+ distro = self._fetch_distro('sles')
+ paths = helpers.Paths({})
+ paths.template_tpl = '%s' % self.tmp + '/etc/cloud/templates/%s.tmpl'
+ ds = None
+ cc = cloud.Cloud(ds, paths, {}, distro, None)
+ self.patchUtils(self.tmp)
+ cc_update_etc_hosts.handle('test', cfg, cc, LOG, [])
+ contents = util.load_file('%s/etc/hosts' % self.tmp)
+ if '127.0.0.1 cloud-init.test.us cloud-init' not in contents:
+ self.assertIsNone('No entry for 127.0.0.1 in etc/hosts')
+ if '::1 cloud-init.test.us cloud-init' not in contents:
+ self.assertIsNone('No entry for 127.0.0.1 in etc/hosts')
diff --git a/tests/unittests/test_handler/test_handler_ntp.py b/tests/unittests/test_handler/test_handler_ntp.py
index 3abe5786..28a8455d 100644
--- a/tests/unittests/test_handler/test_handler_ntp.py
+++ b/tests/unittests/test_handler/test_handler_ntp.py
@@ -430,5 +430,31 @@ class TestNtp(FilesystemMockingTestCase):
"[Time]\nNTP=192.168.2.1 192.168.2.2 0.mypool.org \n",
content.decode())
+ def test_write_ntp_config_template_defaults_pools_empty_lists_sles(self):
+ """write_ntp_config_template defaults pools servers upon empty config.
+
+ When both pools and servers are empty, default NR_POOL_SERVERS get
+ configured.
+ """
+ distro = 'sles'
+ mycloud = self._get_cloud(distro)
+ ntp_conf = self.tmp_path('ntp.conf', self.new_root) # Doesn't exist
+ # Create ntp.conf.tmpl
+ with open('{0}.tmpl'.format(ntp_conf), 'wb') as stream:
+ stream.write(NTP_TEMPLATE)
+ with mock.patch('cloudinit.config.cc_ntp.NTP_CONF', ntp_conf):
+ cc_ntp.write_ntp_config_template({}, mycloud, ntp_conf)
+ content = util.read_file_or_url('file://' + ntp_conf).contents
+ default_pools = [
+ "{0}.opensuse.pool.ntp.org".format(x)
+ for x in range(0, cc_ntp.NR_POOL_SERVERS)]
+ self.assertEqual(
+ "servers []\npools {0}\n".format(default_pools),
+ content.decode())
+ self.assertIn(
+ "Adding distro default ntp pool servers: {0}".format(
+ ",".join(default_pools)),
+ self.logs.getvalue())
+
# vi: ts=4 expandtab
diff --git a/tests/unittests/test_rh_subscription.py b/tests/unittests/test_rh_subscription.py
index e9d5702a..22718108 100644
--- a/tests/unittests/test_rh_subscription.py
+++ b/tests/unittests/test_rh_subscription.py
@@ -2,6 +2,7 @@
"""Tests for registering RHEL subscription via rh_subscription."""
+import copy
import logging
from cloudinit.config import cc_rh_subscription
@@ -68,6 +69,20 @@ class GoodTests(TestCase):
self.assertEqual(self.SM.log_success.call_count, 1)
self.assertEqual(self.SM._sub_man_cli.call_count, 2)
+ @mock.patch.object(cc_rh_subscription.SubscriptionManager, "_getRepos")
+ @mock.patch.object(cc_rh_subscription.SubscriptionManager, "_sub_man_cli")
+ def test_update_repos_disable_with_none(self, m_sub_man_cli, m_get_repos):
+ cfg = copy.deepcopy(self.config)
+ m_get_repos.return_value = ([], ['repo1'])
+ m_sub_man_cli.return_value = (b'', b'')
+ cfg['rh_subscription'].update(
+ {'enable-repo': ['repo1'], 'disable-repo': None})
+ mysm = cc_rh_subscription.SubscriptionManager(cfg)
+ self.assertEqual(True, mysm.update_repos())
+ m_get_repos.assert_called_with()
+ self.assertEqual(m_sub_man_cli.call_args_list,
+ [mock.call(['repos', '--enable=repo1'])])
+
def test_full_registration(self):
'''
Registration with auto-attach, service-level, adding pools,