summaryrefslogtreecommitdiff
path: root/tests/unittests/test_handler
diff options
context:
space:
mode:
authorScott Moser <smoser@ubuntu.com>2016-04-04 12:07:19 -0400
committerScott Moser <smoser@ubuntu.com>2016-04-04 12:07:19 -0400
commit7d8a3194552387fa9e21216bcd9a3bfc76fa2b04 (patch)
treec8dc45b013208a4e5e09e6ade63b3b5994f80aa3 /tests/unittests/test_handler
parent93f5af9f5075a416c65c1d0350c374e16f32f0d5 (diff)
parent210b041b2fead7a57af91f60a6f89d9e5aa1ed4a (diff)
downloadvyos-cloud-init-7d8a3194552387fa9e21216bcd9a3bfc76fa2b04.tar.gz
vyos-cloud-init-7d8a3194552387fa9e21216bcd9a3bfc76fa2b04.zip
merge with trunk
Diffstat (limited to 'tests/unittests/test_handler')
-rw-r--r--tests/unittests/test_handler/test_handler_apt_configure.py35
-rw-r--r--tests/unittests/test_handler/test_handler_ca_certs.py254
-rw-r--r--tests/unittests/test_handler/test_handler_chef.py18
-rw-r--r--tests/unittests/test_handler/test_handler_debug.py5
-rw-r--r--tests/unittests/test_handler/test_handler_disk_setup.py30
-rw-r--r--tests/unittests/test_handler/test_handler_growpart.py110
-rw-r--r--tests/unittests/test_handler/test_handler_locale.py11
-rw-r--r--tests/unittests/test_handler/test_handler_lxd.py75
-rw-r--r--tests/unittests/test_handler/test_handler_mounts.py133
-rw-r--r--tests/unittests/test_handler/test_handler_power_state.py47
-rw-r--r--tests/unittests/test_handler/test_handler_rsyslog.py174
-rw-r--r--tests/unittests/test_handler/test_handler_seed_random.py28
-rw-r--r--tests/unittests/test_handler/test_handler_set_hostname.py11
-rw-r--r--tests/unittests/test_handler/test_handler_snappy.py305
-rw-r--r--tests/unittests/test_handler/test_handler_timezone.py11
-rw-r--r--tests/unittests/test_handler/test_handler_yum_add_repo.py12
16 files changed, 1046 insertions, 213 deletions
diff --git a/tests/unittests/test_handler/test_handler_apt_configure.py b/tests/unittests/test_handler/test_handler_apt_configure.py
index 203dd2aa..1ed185ca 100644
--- a/tests/unittests/test_handler/test_handler_apt_configure.py
+++ b/tests/unittests/test_handler/test_handler_apt_configure.py
@@ -1,27 +1,30 @@
-from mocker import MockerTestCase
-
from cloudinit import util
from cloudinit.config import cc_apt_configure
+from ..helpers import TestCase
import os
import re
+import shutil
+import tempfile
+
+def load_tfile_or_url(*args, **kwargs):
+ return(util.decode_binary(util.read_file_or_url(*args, **kwargs).contents))
-class TestAptProxyConfig(MockerTestCase):
+
+class TestAptProxyConfig(TestCase):
def setUp(self):
super(TestAptProxyConfig, self).setUp()
- self.tmp = self.makeDir()
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
self.pfile = os.path.join(self.tmp, "proxy.cfg")
self.cfile = os.path.join(self.tmp, "config.cfg")
def _search_apt_config(self, contents, ptype, value):
- print(
- r"acquire::%s::proxy\s+[\"']%s[\"'];\n" % (ptype, value),
- contents, "flags=re.IGNORECASE")
- return(re.search(
+ return re.search(
r"acquire::%s::proxy\s+[\"']%s[\"'];\n" % (ptype, value),
- contents, flags=re.IGNORECASE))
+ contents, flags=re.IGNORECASE)
def test_apt_proxy_written(self):
cfg = {'apt_proxy': 'myproxy'}
@@ -30,7 +33,7 @@ class TestAptProxyConfig(MockerTestCase):
self.assertTrue(os.path.isfile(self.pfile))
self.assertFalse(os.path.isfile(self.cfile))
- contents = str(util.read_file_or_url(self.pfile))
+ contents = load_tfile_or_url(self.pfile)
self.assertTrue(self._search_apt_config(contents, "http", "myproxy"))
def test_apt_http_proxy_written(self):
@@ -40,7 +43,7 @@ class TestAptProxyConfig(MockerTestCase):
self.assertTrue(os.path.isfile(self.pfile))
self.assertFalse(os.path.isfile(self.cfile))
- contents = str(util.read_file_or_url(self.pfile))
+ contents = load_tfile_or_url(self.pfile)
self.assertTrue(self._search_apt_config(contents, "http", "myproxy"))
def test_apt_all_proxy_written(self):
@@ -58,9 +61,9 @@ class TestAptProxyConfig(MockerTestCase):
self.assertTrue(os.path.isfile(self.pfile))
self.assertFalse(os.path.isfile(self.cfile))
- contents = str(util.read_file_or_url(self.pfile))
+ contents = load_tfile_or_url(self.pfile)
- for ptype, pval in values.iteritems():
+ for ptype, pval in values.items():
self.assertTrue(self._search_apt_config(contents, ptype, pval))
def test_proxy_deleted(self):
@@ -74,7 +77,7 @@ class TestAptProxyConfig(MockerTestCase):
cc_apt_configure.apply_apt_config({'apt_proxy': "foo"},
self.pfile, self.cfile)
self.assertTrue(os.path.isfile(self.pfile))
- contents = str(util.read_file_or_url(self.pfile))
+ contents = load_tfile_or_url(self.pfile)
self.assertTrue(self._search_apt_config(contents, "http", "foo"))
def test_config_written(self):
@@ -86,14 +89,14 @@ class TestAptProxyConfig(MockerTestCase):
self.assertTrue(os.path.isfile(self.cfile))
self.assertFalse(os.path.isfile(self.pfile))
- self.assertEqual(str(util.read_file_or_url(self.cfile)), payload)
+ self.assertEqual(load_tfile_or_url(self.cfile), payload)
def test_config_replaced(self):
util.write_file(self.pfile, "content doesnt matter")
cc_apt_configure.apply_apt_config({'apt_config': "foo"},
self.pfile, self.cfile)
self.assertTrue(os.path.isfile(self.cfile))
- self.assertEqual(str(util.read_file_or_url(self.cfile)), "foo")
+ self.assertEqual(load_tfile_or_url(self.cfile), "foo")
def test_config_deleted(self):
# if no 'apt_config' is provided, delete any previously written file
diff --git a/tests/unittests/test_handler/test_handler_ca_certs.py b/tests/unittests/test_handler/test_handler_ca_certs.py
index 0558023a..a6b9c0fd 100644
--- a/tests/unittests/test_handler/test_handler_ca_certs.py
+++ b/tests/unittests/test_handler/test_handler_ca_certs.py
@@ -1,15 +1,26 @@
-from mocker import MockerTestCase
-
from cloudinit import cloud
from cloudinit import helpers
from cloudinit import util
from cloudinit.config import cc_ca_certs
+from ..helpers import TestCase
import logging
+import shutil
+import tempfile
+import unittest
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
-class TestNoConfig(MockerTestCase):
+class TestNoConfig(unittest.TestCase):
def setUp(self):
super(TestNoConfig, self).setUp()
self.name = "ca-certs"
@@ -22,15 +33,20 @@ class TestNoConfig(MockerTestCase):
Test that nothing is done if no ca-certs configuration is provided.
"""
config = util.get_builtin_cfg()
- self.mocker.replace(util.write_file, passthrough=False)
- self.mocker.replace(cc_ca_certs.update_ca_certs, passthrough=False)
- self.mocker.replay()
+ with ExitStack() as mocks:
+ util_mock = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ certs_mock = mocks.enter_context(
+ mock.patch.object(cc_ca_certs, 'update_ca_certs'))
- cc_ca_certs.handle(self.name, config, self.cloud_init, self.log,
- self.args)
+ cc_ca_certs.handle(self.name, config, self.cloud_init, self.log,
+ self.args)
+ self.assertEqual(util_mock.call_count, 0)
+ self.assertEqual(certs_mock.call_count, 0)
-class TestConfig(MockerTestCase):
+
+class TestConfig(TestCase):
def setUp(self):
super(TestConfig, self).setUp()
self.name = "ca-certs"
@@ -39,16 +55,16 @@ class TestConfig(MockerTestCase):
self.log = logging.getLogger("TestNoConfig")
self.args = []
- # Mock out the functions that actually modify the system
- self.mock_add = self.mocker.replace(cc_ca_certs.add_ca_certs,
- passthrough=False)
- self.mock_update = self.mocker.replace(cc_ca_certs.update_ca_certs,
- passthrough=False)
- self.mock_remove = self.mocker.replace(
- cc_ca_certs.remove_default_ca_certs, passthrough=False)
+ self.mocks = ExitStack()
+ self.addCleanup(self.mocks.close)
- # Order must be correct
- self.mocker.order()
+ # Mock out the functions that actually modify the system
+ self.mock_add = self.mocks.enter_context(
+ mock.patch.object(cc_ca_certs, 'add_ca_certs'))
+ self.mock_update = self.mocks.enter_context(
+ mock.patch.object(cc_ca_certs, 'update_ca_certs'))
+ self.mock_remove = self.mocks.enter_context(
+ mock.patch.object(cc_ca_certs, 'remove_default_ca_certs'))
def test_no_trusted_list(self):
"""
@@ -57,86 +73,88 @@ class TestConfig(MockerTestCase):
"""
config = {"ca-certs": {}}
- # No functions should be called
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.assertEqual(self.mock_add.call_count, 0)
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
+
def test_empty_trusted_list(self):
"""Test that no certificate are written if 'trusted' list is empty."""
config = {"ca-certs": {"trusted": []}}
- # No functions should be called
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.assertEqual(self.mock_add.call_count, 0)
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
+
def test_single_trusted(self):
"""Test that a single cert gets passed to add_ca_certs."""
config = {"ca-certs": {"trusted": ["CERT1"]}}
- self.mock_add(["CERT1"])
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.mock_add.assert_called_once_with(['CERT1'])
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
+
def test_multiple_trusted(self):
"""Test that multiple certs get passed to add_ca_certs."""
config = {"ca-certs": {"trusted": ["CERT1", "CERT2"]}}
- self.mock_add(["CERT1", "CERT2"])
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.mock_add.assert_called_once_with(['CERT1', 'CERT2'])
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
+
def test_remove_default_ca_certs(self):
"""Test remove_defaults works as expected."""
config = {"ca-certs": {"remove-defaults": True}}
- self.mock_remove()
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.assertEqual(self.mock_add.call_count, 0)
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 1)
+
def test_no_remove_defaults_if_false(self):
"""Test remove_defaults is not called when config value is False."""
config = {"ca-certs": {"remove-defaults": False}}
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.assertEqual(self.mock_add.call_count, 0)
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 0)
+
def test_correct_order_for_remove_then_add(self):
"""Test remove_defaults is not called when config value is False."""
config = {"ca-certs": {"remove-defaults": True, "trusted": ["CERT1"]}}
- self.mock_remove()
- self.mock_add(["CERT1"])
- self.mock_update()
- self.mocker.replay()
-
cc_ca_certs.handle(self.name, config, self.cloud, self.log, self.args)
+ self.mock_add.assert_called_once_with(['CERT1'])
+ self.assertEqual(self.mock_update.call_count, 1)
+ self.assertEqual(self.mock_remove.call_count, 1)
-class TestAddCaCerts(MockerTestCase):
+
+class TestAddCaCerts(TestCase):
def setUp(self):
super(TestAddCaCerts, self).setUp()
+ tmpdir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, tmpdir)
self.paths = helpers.Paths({
- 'cloud_dir': self.makeDir()
+ 'cloud_dir': tmpdir,
})
def test_no_certs_in_list(self):
"""Test that no certificate are written if not provided."""
- self.mocker.replace(util.write_file, passthrough=False)
- self.mocker.replay()
- cc_ca_certs.add_ca_certs([])
+ with mock.patch.object(util, 'write_file') as mockobj:
+ cc_ca_certs.add_ca_certs([])
+ self.assertEqual(mockobj.call_count, 0)
def test_single_cert_trailing_cr(self):
"""Test adding a single certificate to the trusted CAs
@@ -146,19 +164,21 @@ class TestAddCaCerts(MockerTestCase):
ca_certs_content = "line1\nline2\ncloud-init-ca-certs.crt\nline3\n"
expected = "line1\nline2\nline3\ncloud-init-ca-certs.crt\n"
- mock_write = self.mocker.replace(util.write_file, passthrough=False)
- mock_load = self.mocker.replace(util.load_file, passthrough=False)
-
- mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
- cert, mode=0644)
-
- mock_load("/etc/ca-certificates.conf")
- self.mocker.result(ca_certs_content)
+ with ExitStack() as mocks:
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ mock_load = mocks.enter_context(
+ mock.patch.object(util, 'load_file',
+ return_value=ca_certs_content))
- mock_write("/etc/ca-certificates.conf", expected, omode="wb")
- self.mocker.replay()
+ cc_ca_certs.add_ca_certs([cert])
- cc_ca_certs.add_ca_certs([cert])
+ mock_write.assert_has_calls([
+ mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
+ cert, mode=0o644),
+ mock.call("/etc/ca-certificates.conf", expected, omode="wb"),
+ ])
+ mock_load.assert_called_once_with("/etc/ca-certificates.conf")
def test_single_cert_no_trailing_cr(self):
"""Test adding a single certificate to the trusted CAs
@@ -167,75 +187,89 @@ class TestAddCaCerts(MockerTestCase):
ca_certs_content = "line1\nline2\nline3"
- mock_write = self.mocker.replace(util.write_file, passthrough=False)
- mock_load = self.mocker.replace(util.load_file, passthrough=False)
+ with ExitStack() as mocks:
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ mock_load = mocks.enter_context(
+ mock.patch.object(util, 'load_file',
+ return_value=ca_certs_content))
- mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
- cert, mode=0644)
+ cc_ca_certs.add_ca_certs([cert])
- mock_load("/etc/ca-certificates.conf")
- self.mocker.result(ca_certs_content)
+ mock_write.assert_has_calls([
+ mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
+ cert, mode=0o644),
+ mock.call("/etc/ca-certificates.conf",
+ "%s\n%s\n" % (ca_certs_content,
+ "cloud-init-ca-certs.crt"),
+ omode="wb"),
+ ])
- mock_write("/etc/ca-certificates.conf",
- "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt"),
- omode="wb")
- self.mocker.replay()
-
- cc_ca_certs.add_ca_certs([cert])
+ mock_load.assert_called_once_with("/etc/ca-certificates.conf")
def test_multiple_certs(self):
"""Test adding multiple certificates to the trusted CAs."""
certs = ["CERT1\nLINE2\nLINE3", "CERT2\nLINE2\nLINE3"]
expected_cert_file = "\n".join(certs)
-
- mock_write = self.mocker.replace(util.write_file, passthrough=False)
- mock_load = self.mocker.replace(util.load_file, passthrough=False)
-
- mock_write("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
- expected_cert_file, mode=0644)
-
ca_certs_content = "line1\nline2\nline3"
- mock_load("/etc/ca-certificates.conf")
- self.mocker.result(ca_certs_content)
- out = "%s\n%s\n" % (ca_certs_content, "cloud-init-ca-certs.crt")
- mock_write("/etc/ca-certificates.conf", out, omode="wb")
+ with ExitStack() as mocks:
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ mock_load = mocks.enter_context(
+ mock.patch.object(util, 'load_file',
+ return_value=ca_certs_content))
- self.mocker.replay()
+ cc_ca_certs.add_ca_certs(certs)
- cc_ca_certs.add_ca_certs(certs)
+ mock_write.assert_has_calls([
+ mock.call("/usr/share/ca-certificates/cloud-init-ca-certs.crt",
+ expected_cert_file, mode=0o644),
+ mock.call("/etc/ca-certificates.conf",
+ "%s\n%s\n" % (ca_certs_content,
+ "cloud-init-ca-certs.crt"),
+ omode='wb'),
+ ])
+ mock_load.assert_called_once_with("/etc/ca-certificates.conf")
-class TestUpdateCaCerts(MockerTestCase):
- def test_commands(self):
- mock_check_call = self.mocker.replace(util.subp,
- passthrough=False)
- mock_check_call(["update-ca-certificates"], capture=False)
- self.mocker.replay()
- cc_ca_certs.update_ca_certs()
+class TestUpdateCaCerts(unittest.TestCase):
+ def test_commands(self):
+ with mock.patch.object(util, 'subp') as mockobj:
+ cc_ca_certs.update_ca_certs()
+ mockobj.assert_called_once_with(
+ ["update-ca-certificates"], capture=False)
-class TestRemoveDefaultCaCerts(MockerTestCase):
+class TestRemoveDefaultCaCerts(TestCase):
def setUp(self):
super(TestRemoveDefaultCaCerts, self).setUp()
+ tmpdir = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, tmpdir)
self.paths = helpers.Paths({
- 'cloud_dir': self.makeDir()
+ 'cloud_dir': tmpdir,
})
def test_commands(self):
- mock_delete_dir_contents = self.mocker.replace(
- util.delete_dir_contents, passthrough=False)
- mock_write = self.mocker.replace(util.write_file, passthrough=False)
- mock_subp = self.mocker.replace(util.subp,
- passthrough=False)
-
- mock_delete_dir_contents("/usr/share/ca-certificates/")
- mock_delete_dir_contents("/etc/ssl/certs/")
- mock_write("/etc/ca-certificates.conf", "", mode=0644)
- mock_subp(('debconf-set-selections', '-'),
- "ca-certificates ca-certificates/trust_new_crts select no")
- self.mocker.replay()
-
- cc_ca_certs.remove_default_ca_certs()
+ with ExitStack() as mocks:
+ mock_delete = mocks.enter_context(
+ mock.patch.object(util, 'delete_dir_contents'))
+ mock_write = mocks.enter_context(
+ mock.patch.object(util, 'write_file'))
+ mock_subp = mocks.enter_context(mock.patch.object(util, 'subp'))
+
+ cc_ca_certs.remove_default_ca_certs()
+
+ mock_delete.assert_has_calls([
+ mock.call("/usr/share/ca-certificates/"),
+ mock.call("/etc/ssl/certs/"),
+ ])
+
+ mock_write.assert_called_once_with(
+ "/etc/ca-certificates.conf", "", mode=0o644)
+
+ mock_subp.assert_called_once_with(
+ ('debconf-set-selections', '-'),
+ "ca-certificates ca-certificates/trust_new_crts select no")
diff --git a/tests/unittests/test_handler/test_handler_chef.py b/tests/unittests/test_handler/test_handler_chef.py
index ef1aa208..edad88cb 100644
--- a/tests/unittests/test_handler/test_handler_chef.py
+++ b/tests/unittests/test_handler/test_handler_chef.py
@@ -11,15 +11,21 @@ from cloudinit.sources import DataSourceNone
from .. import helpers as t_help
+import six
import logging
+import shutil
+import tempfile
LOG = logging.getLogger(__name__)
+CLIENT_TEMPL = os.path.sep.join(["templates", "chef_client.rb.tmpl"])
+
class TestChef(t_help.FilesystemMockingTestCase):
def setUp(self):
super(TestChef, self).setUp()
- self.tmp = self.makeDir(prefix="unittest_")
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def fetch_cloud(self, distro_kind):
cls = distros.fetch(distro_kind)
@@ -37,9 +43,13 @@ class TestChef(t_help.FilesystemMockingTestCase):
for d in cc_chef.CHEF_DIRS:
self.assertFalse(os.path.isdir(d))
+ @t_help.skipIf(not os.path.isfile(CLIENT_TEMPL),
+ CLIENT_TEMPL + " is not available")
def test_basic_config(self):
- # This should create a file of the format...
"""
+ test basic config looks sane
+
+ # This should create a file of the format...
# Created by cloud-init v. 0.7.6 on Sat, 11 Oct 2014 23:57:21 +0000
log_level :info
ssl_verify_mode :verify_none
@@ -74,7 +84,7 @@ class TestChef(t_help.FilesystemMockingTestCase):
for k, v in cfg['chef'].items():
self.assertIn(v, c)
for k, v in cc_chef.CHEF_RB_TPL_DEFAULTS.items():
- if isinstance(v, basestring):
+ if isinstance(v, six.string_types):
self.assertIn(v, c)
c = util.load_file(cc_chef.CHEF_FB_PATH)
self.assertEqual({}, json.loads(c))
@@ -101,6 +111,8 @@ class TestChef(t_help.FilesystemMockingTestCase):
'c': 'd',
}, json.loads(c))
+ @t_help.skipIf(not os.path.isfile(CLIENT_TEMPL),
+ CLIENT_TEMPL + " is not available")
def test_template_deletes(self):
tpl_file = util.load_file('templates/chef_client.rb.tmpl')
self.patchUtils(self.tmp)
diff --git a/tests/unittests/test_handler/test_handler_debug.py b/tests/unittests/test_handler/test_handler_debug.py
index 8891ca04..80708d7b 100644
--- a/tests/unittests/test_handler/test_handler_debug.py
+++ b/tests/unittests/test_handler/test_handler_debug.py
@@ -26,6 +26,8 @@ from cloudinit.sources import DataSourceNone
from .. import helpers as t_help
import logging
+import shutil
+import tempfile
LOG = logging.getLogger(__name__)
@@ -33,7 +35,8 @@ LOG = logging.getLogger(__name__)
class TestDebug(t_help.FilesystemMockingTestCase):
def setUp(self):
super(TestDebug, self).setUp()
- self.new_root = self.makeDir(prefix="unittest_")
+ self.new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.new_root)
def _get_cloud(self, distro, metadata=None):
self.patchUtils(self.new_root)
diff --git a/tests/unittests/test_handler/test_handler_disk_setup.py b/tests/unittests/test_handler/test_handler_disk_setup.py
new file mode 100644
index 00000000..ddef8d48
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_disk_setup.py
@@ -0,0 +1,30 @@
+from cloudinit.config import cc_disk_setup
+from ..helpers import ExitStack, mock, TestCase
+
+
+class TestIsDiskUsed(TestCase):
+
+ def setUp(self):
+ super(TestIsDiskUsed, self).setUp()
+ self.patches = ExitStack()
+ mod_name = 'cloudinit.config.cc_disk_setup'
+ self.enumerate_disk = self.patches.enter_context(
+ mock.patch('{0}.enumerate_disk'.format(mod_name)))
+ self.check_fs = self.patches.enter_context(
+ mock.patch('{0}.check_fs'.format(mod_name)))
+
+ def test_multiple_child_nodes_returns_true(self):
+ self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(2))
+ self.check_fs.return_value = (mock.MagicMock(), None, mock.MagicMock())
+ self.assertTrue(cc_disk_setup.is_disk_used(mock.MagicMock()))
+
+ def test_valid_filesystem_returns_true(self):
+ self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(1))
+ self.check_fs.return_value = (
+ mock.MagicMock(), 'ext4', mock.MagicMock())
+ self.assertTrue(cc_disk_setup.is_disk_used(mock.MagicMock()))
+
+ def test_one_child_nodes_and_no_fs_returns_false(self):
+ self.enumerate_disk.return_value = (mock.MagicMock() for _ in range(1))
+ self.check_fs.return_value = (mock.MagicMock(), None, mock.MagicMock())
+ self.assertFalse(cc_disk_setup.is_disk_used(mock.MagicMock()))
diff --git a/tests/unittests/test_handler/test_handler_growpart.py b/tests/unittests/test_handler/test_handler_growpart.py
index 5d0636d1..bef0d80d 100644
--- a/tests/unittests/test_handler/test_handler_growpart.py
+++ b/tests/unittests/test_handler/test_handler_growpart.py
@@ -1,14 +1,23 @@
-from mocker import MockerTestCase
-
from cloudinit import cloud
from cloudinit import util
from cloudinit.config import cc_growpart
+from ..helpers import TestCase
import errno
import logging
import os
import re
+import unittest
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+try:
+ from contextlib import ExitStack
+except ImportError:
+ from contextlib2 import ExitStack
# growpart:
# mode: auto # off, on, auto, 'growpart'
@@ -42,7 +51,7 @@ growpart disk partition
"""
-class TestDisabled(MockerTestCase):
+class TestDisabled(unittest.TestCase):
def setUp(self):
super(TestDisabled, self).setUp()
self.name = "growpart"
@@ -57,14 +66,14 @@ class TestDisabled(MockerTestCase):
# this really only verifies that resizer_factory isn't called
config = {'growpart': {'mode': 'off'}}
- self.mocker.replace(cc_growpart.resizer_factory,
- passthrough=False)
- self.mocker.replay()
- self.handle(self.name, config, self.cloud_init, self.log, self.args)
+ with mock.patch.object(cc_growpart, 'resizer_factory') as mockobj:
+ self.handle(self.name, config, self.cloud_init, self.log,
+ self.args)
+ self.assertEqual(mockobj.call_count, 0)
-class TestConfig(MockerTestCase):
+class TestConfig(TestCase):
def setUp(self):
super(TestConfig, self).setUp()
self.name = "growpart"
@@ -77,75 +86,76 @@ class TestConfig(MockerTestCase):
self.cloud_init = None
self.handle = cc_growpart.handle
- # Order must be correct
- self.mocker.order()
-
def test_no_resizers_auto_is_fine(self):
- subp = self.mocker.replace(util.subp, passthrough=False)
- subp(['growpart', '--help'], env={'LANG': 'C'})
- self.mocker.result((HELP_GROWPART_NO_RESIZE, ""))
- self.mocker.replay()
+ with mock.patch.object(
+ util, 'subp',
+ return_value=(HELP_GROWPART_NO_RESIZE, "")) as mockobj:
- config = {'growpart': {'mode': 'auto'}}
- self.handle(self.name, config, self.cloud_init, self.log, self.args)
+ config = {'growpart': {'mode': 'auto'}}
+ self.handle(self.name, config, self.cloud_init, self.log,
+ self.args)
+
+ mockobj.assert_called_once_with(
+ ['growpart', '--help'], env={'LANG': 'C'})
def test_no_resizers_mode_growpart_is_exception(self):
- subp = self.mocker.replace(util.subp, passthrough=False)
- subp(['growpart', '--help'], env={'LANG': 'C'})
- self.mocker.result((HELP_GROWPART_NO_RESIZE, ""))
- self.mocker.replay()
+ with mock.patch.object(
+ util, 'subp',
+ return_value=(HELP_GROWPART_NO_RESIZE, "")) as mockobj:
+ config = {'growpart': {'mode': "growpart"}}
+ self.assertRaises(
+ ValueError, self.handle, self.name, config,
+ self.cloud_init, self.log, self.args)
- config = {'growpart': {'mode': "growpart"}}
- self.assertRaises(ValueError, self.handle, self.name, config,
- self.cloud_init, self.log, self.args)
+ mockobj.assert_called_once_with(
+ ['growpart', '--help'], env={'LANG': 'C'})
def test_mode_auto_prefers_growpart(self):
- subp = self.mocker.replace(util.subp, passthrough=False)
- subp(['growpart', '--help'], env={'LANG': 'C'})
- self.mocker.result((HELP_GROWPART_RESIZE, ""))
- self.mocker.replay()
+ with mock.patch.object(
+ util, 'subp',
+ return_value=(HELP_GROWPART_RESIZE, "")) as mockobj:
+ ret = cc_growpart.resizer_factory(mode="auto")
+ self.assertIsInstance(ret, cc_growpart.ResizeGrowPart)
- ret = cc_growpart.resizer_factory(mode="auto")
- self.assertTrue(isinstance(ret, cc_growpart.ResizeGrowPart))
+ mockobj.assert_called_once_with(
+ ['growpart', '--help'], env={'LANG': 'C'})
def test_handle_with_no_growpart_entry(self):
# if no 'growpart' entry in config, then mode=auto should be used
myresizer = object()
+ retval = (("/", cc_growpart.RESIZE.CHANGED, "my-message",),)
+
+ with ExitStack() as mocks:
+ factory = mocks.enter_context(
+ mock.patch.object(cc_growpart, 'resizer_factory',
+ return_value=myresizer))
+ rsdevs = mocks.enter_context(
+ mock.patch.object(cc_growpart, 'resize_devices',
+ return_value=retval))
+ mocks.enter_context(
+ mock.patch.object(cc_growpart, 'RESIZERS',
+ (('mysizer', object),)
+ ))
- factory = self.mocker.replace(cc_growpart.resizer_factory,
- passthrough=False)
- rsdevs = self.mocker.replace(cc_growpart.resize_devices,
- passthrough=False)
- factory("auto")
- self.mocker.result(myresizer)
- rsdevs(myresizer, ["/"])
- self.mocker.result((("/", cc_growpart.RESIZE.CHANGED, "my-message",),))
- self.mocker.replay()
-
- try:
- orig_resizers = cc_growpart.RESIZERS
- cc_growpart.RESIZERS = (('mysizer', object),)
self.handle(self.name, {}, self.cloud_init, self.log, self.args)
- finally:
- cc_growpart.RESIZERS = orig_resizers
+ factory.assert_called_once_with('auto')
+ rsdevs.assert_called_once_with(myresizer, ['/'])
-class TestResize(MockerTestCase):
+
+class TestResize(unittest.TestCase):
def setUp(self):
super(TestResize, self).setUp()
self.name = "growpart"
self.log = logging.getLogger("TestResize")
- # Order must be correct
- self.mocker.order()
-
def test_simple_devices(self):
# test simple device list
# this patches out devent2dev, os.stat, and device_part_info
# so in the end, doesn't test a lot
devs = ["/dev/XXda1", "/dev/YYda2"]
- devstat_ret = Bunch(st_mode=25008, st_ino=6078, st_dev=5L,
+ devstat_ret = Bunch(st_mode=25008, st_ino=6078, st_dev=5,
st_nlink=1, st_uid=0, st_gid=6, st_size=0,
st_atime=0, st_mtime=0, st_ctime=0)
enoent = ["/dev/NOENT"]
diff --git a/tests/unittests/test_handler/test_handler_locale.py b/tests/unittests/test_handler/test_handler_locale.py
index eb251636..de85eff6 100644
--- a/tests/unittests/test_handler/test_handler_locale.py
+++ b/tests/unittests/test_handler/test_handler_locale.py
@@ -29,9 +29,11 @@ from .. import helpers as t_help
from configobj import ConfigObj
-from StringIO import StringIO
+from six import BytesIO
import logging
+import shutil
+import tempfile
LOG = logging.getLogger(__name__)
@@ -39,7 +41,8 @@ LOG = logging.getLogger(__name__)
class TestLocale(t_help.FilesystemMockingTestCase):
def setUp(self):
super(TestLocale, self).setUp()
- self.new_root = self.makeDir(prefix="unittest_")
+ self.new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.new_root)
def _get_cloud(self, distro):
self.patchUtils(self.new_root)
@@ -59,6 +62,6 @@ class TestLocale(t_help.FilesystemMockingTestCase):
cc = self._get_cloud('sles')
cc_locale.handle('cc_locale', cfg, cc, LOG, [])
- contents = util.load_file('/etc/sysconfig/language')
- n_cfg = ConfigObj(StringIO(contents))
+ contents = util.load_file('/etc/sysconfig/language', decode=False)
+ n_cfg = ConfigObj(BytesIO(contents))
self.assertEquals({'RC_LANG': cfg['locale']}, dict(n_cfg))
diff --git a/tests/unittests/test_handler/test_handler_lxd.py b/tests/unittests/test_handler/test_handler_lxd.py
new file mode 100644
index 00000000..7ffa2a53
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_lxd.py
@@ -0,0 +1,75 @@
+from cloudinit.config import cc_lxd
+from cloudinit import (distros, helpers, cloud)
+from cloudinit.sources import DataSourceNoCloud
+from .. import helpers as t_help
+
+import logging
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+LOG = logging.getLogger(__name__)
+
+
+class TestLxd(t_help.TestCase):
+ lxd_cfg = {
+ 'lxd': {
+ 'init': {
+ 'network_address': '0.0.0.0',
+ 'storage_backend': 'zfs',
+ 'storage_pool': 'poolname',
+ }
+ }
+ }
+
+ def setUp(self):
+ super(TestLxd, self).setUp()
+
+ def _get_cloud(self, distro):
+ cls = distros.fetch(distro)
+ paths = helpers.Paths({})
+ d = cls(distro, {}, paths)
+ ds = DataSourceNoCloud.DataSourceNoCloud({}, d, paths)
+ cc = cloud.Cloud(ds, paths, {}, d, None)
+ return cc
+
+ @mock.patch("cloudinit.config.cc_lxd.util")
+ def test_lxd_init(self, mock_util):
+ cc = self._get_cloud('ubuntu')
+ mock_util.which.return_value = True
+ cc_lxd.handle('cc_lxd', self.lxd_cfg, cc, LOG, [])
+ self.assertTrue(mock_util.which.called)
+ init_call = mock_util.subp.call_args_list[0][0][0]
+ self.assertEquals(init_call,
+ ['lxd', 'init', '--auto',
+ '--network-address=0.0.0.0',
+ '--storage-backend=zfs',
+ '--storage-pool=poolname'])
+
+ @mock.patch("cloudinit.config.cc_lxd.util")
+ def test_lxd_install(self, mock_util):
+ cc = self._get_cloud('ubuntu')
+ cc.distro = mock.MagicMock()
+ mock_util.which.return_value = None
+ cc_lxd.handle('cc_lxd', self.lxd_cfg, cc, LOG, [])
+ self.assertTrue(cc.distro.install_packages.called)
+ install_pkg = cc.distro.install_packages.call_args_list[0][0][0]
+ self.assertEquals(sorted(install_pkg), ['lxd', 'zfs'])
+
+ @mock.patch("cloudinit.config.cc_lxd.util")
+ def test_no_init_does_nothing(self, mock_util):
+ cc = self._get_cloud('ubuntu')
+ cc.distro = mock.MagicMock()
+ cc_lxd.handle('cc_lxd', {'lxd': {}}, cc, LOG, [])
+ self.assertFalse(cc.distro.install_packages.called)
+ self.assertFalse(mock_util.subp.called)
+
+ @mock.patch("cloudinit.config.cc_lxd.util")
+ def test_no_lxd_does_nothing(self, mock_util):
+ cc = self._get_cloud('ubuntu')
+ cc.distro = mock.MagicMock()
+ cc_lxd.handle('cc_lxd', {'package_update': True}, cc, LOG, [])
+ self.assertFalse(cc.distro.install_packages.called)
+ self.assertFalse(mock_util.subp.called)
diff --git a/tests/unittests/test_handler/test_handler_mounts.py b/tests/unittests/test_handler/test_handler_mounts.py
new file mode 100644
index 00000000..355674b2
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_mounts.py
@@ -0,0 +1,133 @@
+import os.path
+import shutil
+import tempfile
+
+from cloudinit.config import cc_mounts
+
+from .. import helpers as test_helpers
+
+try:
+ from unittest import mock
+except ImportError:
+ import mock
+
+
+class TestSanitizeDevname(test_helpers.FilesystemMockingTestCase):
+
+ def setUp(self):
+ super(TestSanitizeDevname, self).setUp()
+ self.new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.new_root)
+ self.patchOS(self.new_root)
+
+ def _touch(self, path):
+ path = os.path.join(self.new_root, path.lstrip('/'))
+ basedir = os.path.dirname(path)
+ if not os.path.exists(basedir):
+ os.makedirs(basedir)
+ open(path, 'a').close()
+
+ def _makedirs(self, directory):
+ directory = os.path.join(self.new_root, directory.lstrip('/'))
+ if not os.path.exists(directory):
+ os.makedirs(directory)
+
+ def mock_existence_of_disk(self, disk_path):
+ self._touch(disk_path)
+ self._makedirs(os.path.join('/sys/block', disk_path.split('/')[-1]))
+
+ def mock_existence_of_partition(self, disk_path, partition_number):
+ self.mock_existence_of_disk(disk_path)
+ self._touch(disk_path + str(partition_number))
+ disk_name = disk_path.split('/')[-1]
+ self._makedirs(os.path.join('/sys/block',
+ disk_name,
+ disk_name + str(partition_number)))
+
+ def test_existent_full_disk_path_is_returned(self):
+ disk_path = '/dev/sda'
+ self.mock_existence_of_disk(disk_path)
+ self.assertEqual(disk_path,
+ cc_mounts.sanitize_devname(disk_path,
+ lambda x: None,
+ mock.Mock()))
+
+ def test_existent_disk_name_returns_full_path(self):
+ disk_name = 'sda'
+ disk_path = '/dev/' + disk_name
+ self.mock_existence_of_disk(disk_path)
+ self.assertEqual(disk_path,
+ cc_mounts.sanitize_devname(disk_name,
+ lambda x: None,
+ mock.Mock()))
+
+ def test_existent_meta_disk_is_returned(self):
+ actual_disk_path = '/dev/sda'
+ self.mock_existence_of_disk(actual_disk_path)
+ self.assertEqual(
+ actual_disk_path,
+ cc_mounts.sanitize_devname('ephemeral0',
+ lambda x: actual_disk_path,
+ mock.Mock()))
+
+ def test_existent_meta_partition_is_returned(self):
+ disk_name, partition_part = '/dev/sda', '1'
+ actual_partition_path = disk_name + partition_part
+ self.mock_existence_of_partition(disk_name, partition_part)
+ self.assertEqual(
+ actual_partition_path,
+ cc_mounts.sanitize_devname('ephemeral0.1',
+ lambda x: disk_name,
+ mock.Mock()))
+
+ def test_existent_meta_partition_with_p_is_returned(self):
+ disk_name, partition_part = '/dev/sda', 'p1'
+ actual_partition_path = disk_name + partition_part
+ self.mock_existence_of_partition(disk_name, partition_part)
+ self.assertEqual(
+ actual_partition_path,
+ cc_mounts.sanitize_devname('ephemeral0.1',
+ lambda x: disk_name,
+ mock.Mock()))
+
+ def test_first_partition_returned_if_existent_disk_is_partitioned(self):
+ disk_name, partition_part = '/dev/sda', '1'
+ actual_partition_path = disk_name + partition_part
+ self.mock_existence_of_partition(disk_name, partition_part)
+ self.assertEqual(
+ actual_partition_path,
+ cc_mounts.sanitize_devname('ephemeral0',
+ lambda x: disk_name,
+ mock.Mock()))
+
+ def test_nth_partition_returned_if_requested(self):
+ disk_name, partition_part = '/dev/sda', '3'
+ actual_partition_path = disk_name + partition_part
+ self.mock_existence_of_partition(disk_name, partition_part)
+ self.assertEqual(
+ actual_partition_path,
+ cc_mounts.sanitize_devname('ephemeral0.3',
+ lambda x: disk_name,
+ mock.Mock()))
+
+ def test_transformer_returning_none_returns_none(self):
+ self.assertIsNone(
+ cc_mounts.sanitize_devname(
+ 'ephemeral0', lambda x: None, mock.Mock()))
+
+ def test_missing_device_returns_none(self):
+ self.assertIsNone(
+ cc_mounts.sanitize_devname('/dev/sda', None, mock.Mock()))
+
+ def test_missing_sys_returns_none(self):
+ disk_path = '/dev/sda'
+ self._makedirs(disk_path)
+ self.assertIsNone(
+ cc_mounts.sanitize_devname(disk_path, None, mock.Mock()))
+
+ def test_existent_disk_but_missing_partition_returns_none(self):
+ disk_path = '/dev/sda'
+ self.mock_existence_of_disk(disk_path)
+ self.assertIsNone(
+ cc_mounts.sanitize_devname(
+ 'ephemeral0.1', lambda x: disk_path, mock.Mock()))
diff --git a/tests/unittests/test_handler/test_handler_power_state.py b/tests/unittests/test_handler/test_handler_power_state.py
index 2f86b8f8..04ce5687 100644
--- a/tests/unittests/test_handler/test_handler_power_state.py
+++ b/tests/unittests/test_handler/test_handler_power_state.py
@@ -1,6 +1,9 @@
+import sys
+
from cloudinit.config import cc_power_state_change as psc
from .. import helpers as t_help
+from ..helpers import mock
class TestLoadPowerState(t_help.TestCase):
@@ -9,12 +12,12 @@ class TestLoadPowerState(t_help.TestCase):
def test_no_config(self):
# completely empty config should mean do nothing
- (cmd, _timeout) = psc.load_power_state({})
+ (cmd, _timeout, _condition) = psc.load_power_state({})
self.assertEqual(cmd, None)
def test_irrelevant_config(self):
# no power_state field in config should return None for cmd
- (cmd, _timeout) = psc.load_power_state({'foo': 'bar'})
+ (cmd, _timeout, _condition) = psc.load_power_state({'foo': 'bar'})
self.assertEqual(cmd, None)
def test_invalid_mode(self):
@@ -53,23 +56,59 @@ class TestLoadPowerState(t_help.TestCase):
def test_no_message(self):
# if message is not present, then no argument should be passed for it
cfg = {'power_state': {'mode': 'poweroff'}}
- (cmd, _timeout) = psc.load_power_state(cfg)
+ (cmd, _timeout, _condition) = psc.load_power_state(cfg)
self.assertNotIn("", cmd)
check_lps_ret(psc.load_power_state(cfg))
self.assertTrue(len(cmd) == 3)
+ def test_condition_null_raises(self):
+ cfg = {'power_state': {'mode': 'poweroff', 'condition': None}}
+ self.assertRaises(TypeError, psc.load_power_state, cfg)
+
+ def test_condition_default_is_true(self):
+ cfg = {'power_state': {'mode': 'poweroff'}}
+ _cmd, _timeout, cond = psc.load_power_state(cfg)
+ self.assertEqual(cond, True)
+
+
+class TestCheckCondition(t_help.TestCase):
+ def cmd_with_exit(self, rc):
+ return([sys.executable, '-c', 'import sys; sys.exit(%s)' % rc])
+
+ def test_true_is_true(self):
+ self.assertEqual(psc.check_condition(True), True)
+
+ def test_false_is_false(self):
+ self.assertEqual(psc.check_condition(False), False)
+
+ def test_cmd_exit_zero_true(self):
+ self.assertEqual(psc.check_condition(self.cmd_with_exit(0)), True)
+
+ def test_cmd_exit_one_false(self):
+ self.assertEqual(psc.check_condition(self.cmd_with_exit(1)), False)
+
+ def test_cmd_exit_nonzero_warns(self):
+ mocklog = mock.Mock()
+ self.assertEqual(
+ psc.check_condition(self.cmd_with_exit(2), mocklog), False)
+ self.assertEqual(mocklog.warn.call_count, 1)
+
def check_lps_ret(psc_return, mode=None):
- if len(psc_return) != 2:
+ if len(psc_return) != 3:
raise TypeError("length returned = %d" % len(psc_return))
errs = []
cmd = psc_return[0]
timeout = psc_return[1]
+ condition = psc_return[2]
if 'shutdown' not in psc_return[0][0]:
errs.append("string 'shutdown' not in cmd")
+ if condition is None:
+ errs.append("condition was not returned")
+
if mode is not None:
opt = {'halt': '-H', 'poweroff': '-P', 'reboot': '-r'}[mode]
if opt not in psc_return[0]:
diff --git a/tests/unittests/test_handler/test_handler_rsyslog.py b/tests/unittests/test_handler/test_handler_rsyslog.py
new file mode 100644
index 00000000..b932165c
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_rsyslog.py
@@ -0,0 +1,174 @@
+import os
+import shutil
+import tempfile
+
+from cloudinit.config.cc_rsyslog import (
+ apply_rsyslog_changes, DEF_DIR, DEF_FILENAME, DEF_RELOAD, load_config,
+ parse_remotes_line, remotes_to_rsyslog_cfg)
+from cloudinit import util
+
+from .. import helpers as t_help
+
+
+class TestLoadConfig(t_help.TestCase):
+ def setUp(self):
+ super(TestLoadConfig, self).setUp()
+ self.basecfg = {
+ 'config_filename': DEF_FILENAME,
+ 'config_dir': DEF_DIR,
+ 'service_reload_command': DEF_RELOAD,
+ 'configs': [],
+ 'remotes': {},
+ }
+
+ def test_legacy_full(self):
+ found = load_config({
+ 'rsyslog': ['*.* @192.168.1.1'],
+ 'rsyslog_dir': "mydir",
+ 'rsyslog_filename': "myfilename"})
+ self.basecfg.update({
+ 'configs': ['*.* @192.168.1.1'],
+ 'config_dir': "mydir",
+ 'config_filename': 'myfilename',
+ 'service_reload_command': 'auto'}
+ )
+
+ self.assertEqual(found, self.basecfg)
+
+ def test_legacy_defaults(self):
+ found = load_config({
+ 'rsyslog': ['*.* @192.168.1.1']})
+ self.basecfg.update({
+ 'configs': ['*.* @192.168.1.1']})
+ self.assertEqual(found, self.basecfg)
+
+ def test_new_defaults(self):
+ self.assertEqual(load_config({}), self.basecfg)
+
+ def test_new_configs(self):
+ cfgs = ['*.* myhost', '*.* my2host']
+ self.basecfg.update({'configs': cfgs})
+ self.assertEqual(
+ load_config({'rsyslog': {'configs': cfgs}}),
+ self.basecfg)
+
+
+class TestApplyChanges(t_help.TestCase):
+ def setUp(self):
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
+
+ def test_simple(self):
+ cfgline = "*.* foohost"
+ changed = apply_rsyslog_changes(
+ configs=[cfgline], def_fname="foo.cfg", cfg_dir=self.tmp)
+
+ fname = os.path.join(self.tmp, "foo.cfg")
+ self.assertEqual([fname], changed)
+ self.assertEqual(
+ util.load_file(fname), cfgline + "\n")
+
+ def test_multiple_files(self):
+ configs = [
+ '*.* foohost',
+ {'content': 'abc', 'filename': 'my.cfg'},
+ {'content': 'filefoo-content',
+ 'filename': os.path.join(self.tmp, 'mydir/mycfg')},
+ ]
+
+ changed = apply_rsyslog_changes(
+ configs=configs, def_fname="default.cfg", cfg_dir=self.tmp)
+
+ expected = [
+ (os.path.join(self.tmp, "default.cfg"),
+ "*.* foohost\n"),
+ (os.path.join(self.tmp, "my.cfg"), "abc\n"),
+ (os.path.join(self.tmp, "mydir/mycfg"), "filefoo-content\n"),
+ ]
+ self.assertEqual([f[0] for f in expected], changed)
+ actual = []
+ for fname, _content in expected:
+ util.load_file(fname)
+ actual.append((fname, util.load_file(fname),))
+ self.assertEqual(expected, actual)
+
+ def test_repeat_def(self):
+ configs = ['*.* foohost', "*.warn otherhost"]
+
+ changed = apply_rsyslog_changes(
+ configs=configs, def_fname="default.cfg", cfg_dir=self.tmp)
+
+ fname = os.path.join(self.tmp, "default.cfg")
+ self.assertEqual([fname], changed)
+
+ expected_content = '\n'.join([c for c in configs]) + '\n'
+ found_content = util.load_file(fname)
+ self.assertEqual(expected_content, found_content)
+
+ def test_multiline_content(self):
+ configs = ['line1', 'line2\nline3\n']
+
+ apply_rsyslog_changes(
+ configs=configs, def_fname="default.cfg", cfg_dir=self.tmp)
+
+ fname = os.path.join(self.tmp, "default.cfg")
+ expected_content = '\n'.join([c for c in configs])
+ found_content = util.load_file(fname)
+ self.assertEqual(expected_content, found_content)
+
+
+class TestParseRemotesLine(t_help.TestCase):
+ def test_valid_port(self):
+ r = parse_remotes_line("foo:9")
+ self.assertEqual(9, r.port)
+
+ def test_invalid_port(self):
+ with self.assertRaises(ValueError):
+ parse_remotes_line("*.* foo:abc")
+
+ def test_valid_ipv6(self):
+ r = parse_remotes_line("*.* [::1]")
+ self.assertEqual("*.* @[::1]", str(r))
+
+ def test_valid_ipv6_with_port(self):
+ r = parse_remotes_line("*.* [::1]:100")
+ self.assertEqual(r.port, 100)
+ self.assertEqual(r.addr, "::1")
+ self.assertEqual("*.* @[::1]:100", str(r))
+
+ def test_invalid_multiple_colon(self):
+ with self.assertRaises(ValueError):
+ parse_remotes_line("*.* ::1:100")
+
+ def test_name_in_string(self):
+ r = parse_remotes_line("syslog.host", name="foobar")
+ self.assertEqual("*.* @syslog.host # foobar", str(r))
+
+
+class TestRemotesToSyslog(t_help.TestCase):
+ def test_simple(self):
+ # str rendered line must appear in remotes_to_ryslog_cfg return
+ mycfg = "*.* myhost"
+ myline = str(parse_remotes_line(mycfg, name="myname"))
+ r = remotes_to_rsyslog_cfg({'myname': mycfg})
+ lines = r.splitlines()
+ self.assertEqual(1, len(lines))
+ self.assertTrue(myline in r.splitlines())
+
+ def test_header_footer(self):
+ header = "#foo head"
+ footer = "#foo foot"
+ r = remotes_to_rsyslog_cfg(
+ {'myname': "*.* myhost"}, header=header, footer=footer)
+ lines = r.splitlines()
+ self.assertTrue(header, lines[0])
+ self.assertTrue(footer, lines[-1])
+
+ def test_with_empty_or_null(self):
+ mycfg = "*.* myhost"
+ myline = str(parse_remotes_line(mycfg, name="myname"))
+ r = remotes_to_rsyslog_cfg(
+ {'myname': mycfg, 'removed': None, 'removed2': ""})
+ lines = r.splitlines()
+ self.assertEqual(1, len(lines))
+ self.assertTrue(myline in r.splitlines())
diff --git a/tests/unittests/test_handler/test_handler_seed_random.py b/tests/unittests/test_handler/test_handler_seed_random.py
index 40481f16..98bc9b81 100644
--- a/tests/unittests/test_handler/test_handler_seed_random.py
+++ b/tests/unittests/test_handler/test_handler_seed_random.py
@@ -18,11 +18,10 @@
from cloudinit.config import cc_seed_random
-import base64
import gzip
import tempfile
-from StringIO import StringIO
+from six import BytesIO
from cloudinit import cloud
from cloudinit import distros
@@ -69,7 +68,7 @@ class TestRandomSeed(t_help.TestCase):
return
def _compress(self, text):
- contents = StringIO()
+ contents = BytesIO()
gz_fh = gzip.GzipFile(mode='wb', fileobj=contents)
gz_fh.write(text)
gz_fh.close()
@@ -96,7 +95,7 @@ class TestRandomSeed(t_help.TestCase):
self.assertEquals("tiny-tim-was-here", contents)
def test_append_random_unknown_encoding(self):
- data = self._compress("tiny-toe")
+ data = self._compress(b"tiny-toe")
cfg = {
'random_seed': {
'file': self._seed_file,
@@ -108,7 +107,7 @@ class TestRandomSeed(t_help.TestCase):
self._get_cloud('ubuntu'), LOG, [])
def test_append_random_gzip(self):
- data = self._compress("tiny-toe")
+ data = self._compress(b"tiny-toe")
cfg = {
'random_seed': {
'file': self._seed_file,
@@ -121,7 +120,7 @@ class TestRandomSeed(t_help.TestCase):
self.assertEquals("tiny-toe", contents)
def test_append_random_gz(self):
- data = self._compress("big-toe")
+ data = self._compress(b"big-toe")
cfg = {
'random_seed': {
'file': self._seed_file,
@@ -134,7 +133,7 @@ class TestRandomSeed(t_help.TestCase):
self.assertEquals("big-toe", contents)
def test_append_random_base64(self):
- data = base64.b64encode('bubbles')
+ data = util.b64e('bubbles')
cfg = {
'random_seed': {
'file': self._seed_file,
@@ -147,7 +146,7 @@ class TestRandomSeed(t_help.TestCase):
self.assertEquals("bubbles", contents)
def test_append_random_b64(self):
- data = base64.b64encode('kit-kat')
+ data = util.b64e('kit-kat')
cfg = {
'random_seed': {
'file': self._seed_file,
@@ -171,27 +170,30 @@ class TestRandomSeed(t_help.TestCase):
contents = util.load_file(self._seed_file)
self.assertEquals('tiny-tim-was-here-so-was-josh', contents)
- def test_seed_command_not_provided_pollinate_available(self):
+ def test_seed_command_provided_and_available(self):
c = self._get_cloud('ubuntu', {})
self.whichdata = {'pollinate': '/usr/bin/pollinate'}
- cc_seed_random.handle('test', {}, c, LOG, [])
+ cfg = {'random_seed': {'command': ['pollinate', '-q']}}
+ cc_seed_random.handle('test', cfg, c, LOG, [])
subp_args = [f['args'] for f in self.subp_called]
self.assertIn(['pollinate', '-q'], subp_args)
- def test_seed_command_not_provided_pollinate_not_available(self):
+ def test_seed_command_not_provided(self):
c = self._get_cloud('ubuntu', {})
self.whichdata = {}
cc_seed_random.handle('test', {}, c, LOG, [])
# subp should not have been called as which would say not available
- self.assertEquals(self.subp_called, list())
+ self.assertFalse(self.subp_called)
def test_unavailable_seed_command_and_required_raises_error(self):
c = self._get_cloud('ubuntu', {})
self.whichdata = {}
+ cfg = {'random_seed': {'command': ['THIS_NO_COMMAND'],
+ 'command_required': True}}
self.assertRaises(ValueError, cc_seed_random.handle,
- 'test', {'random_seed': {'command_required': True}}, c, LOG, [])
+ 'test', cfg, c, LOG, [])
def test_seed_command_and_required(self):
c = self._get_cloud('ubuntu', {})
diff --git a/tests/unittests/test_handler/test_handler_set_hostname.py b/tests/unittests/test_handler/test_handler_set_hostname.py
index e1530e30..d358b069 100644
--- a/tests/unittests/test_handler/test_handler_set_hostname.py
+++ b/tests/unittests/test_handler/test_handler_set_hostname.py
@@ -7,9 +7,11 @@ from cloudinit import util
from .. import helpers as t_help
+import shutil
+import tempfile
import logging
-from StringIO import StringIO
+from six import BytesIO
from configobj import ConfigObj
@@ -19,7 +21,8 @@ LOG = logging.getLogger(__name__)
class TestHostname(t_help.FilesystemMockingTestCase):
def setUp(self):
super(TestHostname, self).setUp()
- self.tmp = self.makeDir(prefix="unittest_")
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def _fetch_distro(self, kind):
cls = distros.fetch(kind)
@@ -38,8 +41,8 @@ class TestHostname(t_help.FilesystemMockingTestCase):
cc_set_hostname.handle('cc_set_hostname',
cfg, cc, LOG, [])
if not distro.uses_systemd():
- contents = util.load_file("/etc/sysconfig/network")
- n_cfg = ConfigObj(StringIO(contents))
+ contents = util.load_file("/etc/sysconfig/network", decode=False)
+ n_cfg = ConfigObj(BytesIO(contents))
self.assertEquals({'HOSTNAME': 'blah.blah.blah.yahoo.com'},
dict(n_cfg))
diff --git a/tests/unittests/test_handler/test_handler_snappy.py b/tests/unittests/test_handler/test_handler_snappy.py
new file mode 100644
index 00000000..8aeff53c
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_snappy.py
@@ -0,0 +1,305 @@
+from cloudinit.config.cc_snappy import (
+ makeop, get_package_ops, render_snap_op)
+from cloudinit import util
+from .. import helpers as t_help
+
+import os
+import shutil
+import tempfile
+import yaml
+
+ALLOWED = (dict, list, int, str)
+
+
+class TestInstallPackages(t_help.TestCase):
+ def setUp(self):
+ super(TestInstallPackages, self).setUp()
+ self.unapply = []
+
+ # by default 'which' has nothing in its path
+ self.apply_patches([(util, 'subp', self._subp)])
+ self.subp_called = []
+ self.snapcmds = []
+ self.tmp = tempfile.mkdtemp(prefix="TestInstallPackages")
+
+ def tearDown(self):
+ apply_patches([i for i in reversed(self.unapply)])
+ shutil.rmtree(self.tmp)
+
+ def apply_patches(self, patches):
+ ret = apply_patches(patches)
+ self.unapply += ret
+
+ def populate_tmp(self, files):
+ return t_help.populate_dir(self.tmp, files)
+
+ def _subp(self, *args, **kwargs):
+ # supports subp calling with cmd as args or kwargs
+ if 'args' not in kwargs:
+ kwargs['args'] = args[0]
+ self.subp_called.append(kwargs)
+ args = kwargs['args']
+ # here we basically parse the snappy command invoked
+ # and append to snapcmds a list of (mode, pkg, config)
+ if args[0:2] == ['snappy', 'config']:
+ if args[3] == "-":
+ config = kwargs.get('data', '')
+ else:
+ with open(args[3], "rb") as fp:
+ config = yaml.safe_load(fp.read())
+ self.snapcmds.append(['config', args[2], config])
+ elif args[0:2] == ['snappy', 'install']:
+ config = None
+ pkg = None
+ for arg in args[2:]:
+ if arg.startswith("-"):
+ continue
+ if not pkg:
+ pkg = arg
+ elif not config:
+ cfgfile = arg
+ if cfgfile == "-":
+ config = kwargs.get('data', '')
+ elif cfgfile:
+ with open(cfgfile, "rb") as fp:
+ config = yaml.safe_load(fp.read())
+ self.snapcmds.append(['install', pkg, config])
+
+ def test_package_ops_1(self):
+ ret = get_package_ops(
+ packages=['pkg1', 'pkg2', 'pkg3'],
+ configs={'pkg2': b'mycfg2'}, installed=[])
+ self.assertEqual(
+ ret, [makeop('install', 'pkg1', None, None),
+ makeop('install', 'pkg2', b'mycfg2', None),
+ makeop('install', 'pkg3', None, None)])
+
+ def test_package_ops_config_only(self):
+ ret = get_package_ops(
+ packages=None,
+ configs={'pkg2': b'mycfg2'}, installed=['pkg1', 'pkg2'])
+ self.assertEqual(
+ ret, [makeop('config', 'pkg2', b'mycfg2')])
+
+ def test_package_ops_install_and_config(self):
+ ret = get_package_ops(
+ packages=['pkg3', 'pkg2'],
+ configs={'pkg2': b'mycfg2', 'xinstalled': b'xcfg'},
+ installed=['xinstalled'])
+ self.assertEqual(
+ ret, [makeop('install', 'pkg3'),
+ makeop('install', 'pkg2', b'mycfg2'),
+ makeop('config', 'xinstalled', b'xcfg')])
+
+ def test_package_ops_install_long_config_short(self):
+ # a package can be installed by full name, but have config by short
+ cfg = {'k1': 'k2'}
+ ret = get_package_ops(
+ packages=['config-example.canonical'],
+ configs={'config-example': cfg}, installed=[])
+ self.assertEqual(
+ ret, [makeop('install', 'config-example.canonical', cfg)])
+
+ def test_package_ops_with_file(self):
+ self.populate_tmp(
+ {"snapf1.snap": b"foo1", "snapf1.config": b"snapf1cfg",
+ "snapf2.snap": b"foo2", "foo.bar": "ignored"})
+ ret = get_package_ops(
+ packages=['pkg1'], configs={}, installed=[], fspath=self.tmp)
+ self.assertEqual(
+ ret,
+ [makeop_tmpd(self.tmp, 'install', 'snapf1', path="snapf1.snap",
+ cfgfile="snapf1.config"),
+ makeop_tmpd(self.tmp, 'install', 'snapf2', path="snapf2.snap"),
+ makeop('install', 'pkg1')])
+
+ def test_package_ops_common_filename(self):
+ # fish package name from filename
+ # package names likely look like: pkgname.namespace_version_arch.snap
+
+ # find filenames
+ self.populate_tmp(
+ {"pkg-ws.smoser_0.3.4_all.snap": "pkg-ws-snapdata",
+ "pkg-ws.config": "pkg-ws-config",
+ "pkg1.smoser_1.2.3_all.snap": "pkg1.snapdata",
+ "pkg1.smoser.config": "pkg1.smoser.config-data",
+ "pkg1.config": "pkg1.config-data",
+ "pkg2.smoser_0.0_amd64.snap": "pkg2-snapdata",
+ "pkg2.smoser_0.0_amd64.config": "pkg2.config"})
+
+ ret = get_package_ops(
+ packages=[], configs={}, installed=[], fspath=self.tmp)
+ self.assertEqual(
+ ret,
+ [makeop_tmpd(self.tmp, 'install', 'pkg-ws.smoser',
+ path="pkg-ws.smoser_0.3.4_all.snap",
+ cfgfile="pkg-ws.config"),
+ makeop_tmpd(self.tmp, 'install', 'pkg1.smoser',
+ path="pkg1.smoser_1.2.3_all.snap",
+ cfgfile="pkg1.smoser.config"),
+ makeop_tmpd(self.tmp, 'install', 'pkg2.smoser',
+ path="pkg2.smoser_0.0_amd64.snap",
+ cfgfile="pkg2.smoser_0.0_amd64.config"),
+ ])
+
+ def test_package_ops_config_overrides_file(self):
+ # config data overrides local file .config
+ self.populate_tmp(
+ {"snapf1.snap": b"foo1", "snapf1.config": b"snapf1cfg"})
+ ret = get_package_ops(
+ packages=[], configs={'snapf1': 'snapf1cfg-config'},
+ installed=[], fspath=self.tmp)
+ self.assertEqual(
+ ret, [makeop_tmpd(self.tmp, 'install', 'snapf1',
+ path="snapf1.snap", config="snapf1cfg-config")])
+
+ def test_package_ops_namespacing(self):
+ cfgs = {
+ 'config-example': {'k1': 'v1'},
+ 'pkg1': {'p1': 'p2'},
+ 'ubuntu-core': {'c1': 'c2'},
+ 'notinstalled.smoser': {'s1': 's2'},
+ }
+ ret = get_package_ops(
+ packages=['config-example.canonical'], configs=cfgs,
+ installed=['config-example.smoser', 'pkg1.canonical',
+ 'ubuntu-core'])
+
+ expected_configs = [
+ makeop('config', 'pkg1', config=cfgs['pkg1']),
+ makeop('config', 'ubuntu-core', config=cfgs['ubuntu-core'])]
+ expected_installs = [
+ makeop('install', 'config-example.canonical',
+ config=cfgs['config-example'])]
+
+ installs = [i for i in ret if i['op'] == 'install']
+ configs = [c for c in ret if c['op'] == 'config']
+
+ self.assertEqual(installs, expected_installs)
+ # configs are not ordered
+ self.assertEqual(len(configs), len(expected_configs))
+ self.assertTrue(all(found in expected_configs for found in configs))
+
+ def test_render_op_localsnap(self):
+ self.populate_tmp({"snapf1.snap": b"foo1"})
+ op = makeop_tmpd(self.tmp, 'install', 'snapf1',
+ path='snapf1.snap')
+ render_snap_op(**op)
+ self.assertEqual(
+ self.snapcmds, [['install', op['path'], None]])
+
+ def test_render_op_localsnap_localconfig(self):
+ self.populate_tmp(
+ {"snapf1.snap": b"foo1", 'snapf1.config': b'snapf1cfg'})
+ op = makeop_tmpd(self.tmp, 'install', 'snapf1',
+ path='snapf1.snap', cfgfile='snapf1.config')
+ render_snap_op(**op)
+ self.assertEqual(
+ self.snapcmds, [['install', op['path'], 'snapf1cfg']])
+
+ def test_render_op_snap(self):
+ op = makeop('install', 'snapf1')
+ render_snap_op(**op)
+ self.assertEqual(
+ self.snapcmds, [['install', 'snapf1', None]])
+
+ def test_render_op_snap_config(self):
+ mycfg = {'key1': 'value1'}
+ name = "snapf1"
+ op = makeop('install', name, config=mycfg)
+ render_snap_op(**op)
+ self.assertEqual(
+ self.snapcmds, [['install', name, {'config': {name: mycfg}}]])
+
+ def test_render_op_config_bytes(self):
+ name = "snapf1"
+ mycfg = b'myconfig'
+ op = makeop('config', name, config=mycfg)
+ render_snap_op(**op)
+ self.assertEqual(
+ self.snapcmds, [['config', 'snapf1', {'config': {name: mycfg}}]])
+
+ def test_render_op_config_string(self):
+ name = 'snapf1'
+ mycfg = 'myconfig: foo\nhisconfig: bar\n'
+ op = makeop('config', name, config=mycfg)
+ render_snap_op(**op)
+ self.assertEqual(
+ self.snapcmds, [['config', 'snapf1', {'config': {name: mycfg}}]])
+
+ def test_render_op_config_dict(self):
+ # config entry for package can be a dict, not a string blob
+ mycfg = {'foo': 'bar'}
+ name = 'snapf1'
+ op = makeop('config', name, config=mycfg)
+ render_snap_op(**op)
+ # snapcmds is a list of 3-entry lists. data_found will be the
+ # blob of data in the file in 'snappy install --config=<file>'
+ data_found = self.snapcmds[0][2]
+ self.assertEqual(mycfg, data_found['config'][name])
+
+ def test_render_op_config_list(self):
+ # config entry for package can be a list, not a string blob
+ mycfg = ['foo', 'bar', 'wark', {'f1': 'b1'}]
+ name = "snapf1"
+ op = makeop('config', name, config=mycfg)
+ render_snap_op(**op)
+ data_found = self.snapcmds[0][2]
+ self.assertEqual(mycfg, data_found['config'][name])
+
+ def test_render_op_config_int(self):
+ # config entry for package can be a list, not a string blob
+ mycfg = 1
+ name = 'snapf1'
+ op = makeop('config', name, config=mycfg)
+ render_snap_op(**op)
+ data_found = self.snapcmds[0][2]
+ self.assertEqual(mycfg, data_found['config'][name])
+
+ def test_render_long_configs_short(self):
+ # install a namespaced package should have un-namespaced config
+ mycfg = {'k1': 'k2'}
+ name = 'snapf1'
+ op = makeop('install', name + ".smoser", config=mycfg)
+ render_snap_op(**op)
+ data_found = self.snapcmds[0][2]
+ self.assertEqual(mycfg, data_found['config'][name])
+
+ def test_render_does_not_pad_cfgfile(self):
+ # package_ops with cfgfile should not modify --file= content.
+ mydata = "foo1: bar1\nk: [l1, l2, l3]\n"
+ self.populate_tmp(
+ {"snapf1.snap": b"foo1", "snapf1.config": mydata.encode()})
+ ret = get_package_ops(
+ packages=[], configs={}, installed=[], fspath=self.tmp)
+ self.assertEqual(
+ ret,
+ [makeop_tmpd(self.tmp, 'install', 'snapf1', path="snapf1.snap",
+ cfgfile="snapf1.config")])
+
+ # now the op was ok, but test that render didn't mess it up.
+ render_snap_op(**ret[0])
+ data_found = self.snapcmds[0][2]
+ # the data found gets loaded in the snapcmd interpretation
+ # so this comparison is a bit lossy, but input to snappy config
+ # is expected to be yaml loadable, so it should be OK.
+ self.assertEqual(yaml.safe_load(mydata), data_found)
+
+
+def makeop_tmpd(tmpd, op, name, config=None, path=None, cfgfile=None):
+ if cfgfile:
+ cfgfile = os.path.sep.join([tmpd, cfgfile])
+ if path:
+ path = os.path.sep.join([tmpd, path])
+ return(makeop(op=op, name=name, config=config, path=path, cfgfile=cfgfile))
+
+
+def apply_patches(patches):
+ ret = []
+ for (ref, name, replace) in patches:
+ if replace is None:
+ continue
+ orig = getattr(ref, name)
+ setattr(ref, name, replace)
+ ret.append((ref, name, orig))
+ return ret
diff --git a/tests/unittests/test_handler/test_handler_timezone.py b/tests/unittests/test_handler/test_handler_timezone.py
index 874db340..e3df8759 100644
--- a/tests/unittests/test_handler/test_handler_timezone.py
+++ b/tests/unittests/test_handler/test_handler_timezone.py
@@ -29,8 +29,10 @@ from .. import helpers as t_help
from configobj import ConfigObj
-from StringIO import StringIO
+from six import BytesIO
+import shutil
+import tempfile
import logging
LOG = logging.getLogger(__name__)
@@ -39,7 +41,8 @@ LOG = logging.getLogger(__name__)
class TestTimezone(t_help.FilesystemMockingTestCase):
def setUp(self):
super(TestTimezone, self).setUp()
- self.new_root = self.makeDir(prefix="unittest_")
+ self.new_root = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.new_root)
def _get_cloud(self, distro):
self.patchUtils(self.new_root)
@@ -67,8 +70,8 @@ class TestTimezone(t_help.FilesystemMockingTestCase):
cc_timezone.handle('cc_timezone', cfg, cc, LOG, [])
- contents = util.load_file('/etc/sysconfig/clock')
- n_cfg = ConfigObj(StringIO(contents))
+ contents = util.load_file('/etc/sysconfig/clock', decode=False)
+ n_cfg = ConfigObj(BytesIO(contents))
self.assertEquals({'TIMEZONE': cfg['timezone']}, dict(n_cfg))
contents = util.load_file('/etc/localtime')
diff --git a/tests/unittests/test_handler/test_handler_yum_add_repo.py b/tests/unittests/test_handler/test_handler_yum_add_repo.py
index 435c9787..3a8aa7c1 100644
--- a/tests/unittests/test_handler/test_handler_yum_add_repo.py
+++ b/tests/unittests/test_handler/test_handler_yum_add_repo.py
@@ -4,9 +4,11 @@ from cloudinit.config import cc_yum_add_repo
from .. import helpers
+import shutil
+import tempfile
import logging
-from StringIO import StringIO
+from six import BytesIO
import configobj
@@ -16,7 +18,8 @@ LOG = logging.getLogger(__name__)
class TestConfig(helpers.FilesystemMockingTestCase):
def setUp(self):
super(TestConfig, self).setUp()
- self.tmp = self.makeDir(prefix="unittest_")
+ self.tmp = tempfile.mkdtemp()
+ self.addCleanup(shutil.rmtree, self.tmp)
def test_bad_config(self):
cfg = {
@@ -52,8 +55,9 @@ class TestConfig(helpers.FilesystemMockingTestCase):
}
self.patchUtils(self.tmp)
cc_yum_add_repo.handle('yum_add_repo', cfg, None, LOG, [])
- contents = util.load_file("/etc/yum.repos.d/epel_testing.repo")
- contents = configobj.ConfigObj(StringIO(contents))
+ contents = util.load_file("/etc/yum.repos.d/epel_testing.repo",
+ decode=False)
+ contents = configobj.ConfigObj(BytesIO(contents))
expected = {
'epel_testing': {
'name': 'Extra Packages for Enterprise Linux 5 - Testing',