summaryrefslogtreecommitdiff
path: root/tests/unittests
diff options
context:
space:
mode:
authorScott Moser <smoser@ubuntu.com>2013-03-19 08:22:30 -0400
committerScott Moser <smoser@ubuntu.com>2013-03-19 08:22:30 -0400
commita904de054f7818137c864e0f3ff728b3764cf457 (patch)
treee28f4807469f2c39554d4353bf11685c03a1b4aa /tests/unittests
parent204e79b93c882e17df63b24f7f682c0dbefb482d (diff)
parentae0f94c8f39a234d73ab8e2caf24d73439c8b5ee (diff)
downloadvyos-cloud-init-a904de054f7818137c864e0f3ff728b3764cf457.tar.gz
vyos-cloud-init-a904de054f7818137c864e0f3ff728b3764cf457.zip
merge from trunk at revno 799
Diffstat (limited to 'tests/unittests')
-rw-r--r--tests/unittests/helpers.py2
-rw-r--r--tests/unittests/test__init__.py26
-rw-r--r--tests/unittests/test_builtin_handlers.py23
-rw-r--r--tests/unittests/test_datasource/test_nocloud.py2
-rw-r--r--tests/unittests/test_handler/test_handler_growpart.py255
-rw-r--r--tests/unittests/test_merging.py204
-rw-r--r--tests/unittests/test_sshutil.py101
-rw-r--r--tests/unittests/test_userdata.py107
-rw-r--r--tests/unittests/test_util.py95
9 files changed, 714 insertions, 101 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py
index 91a50e18..e020a3ec 100644
--- a/tests/unittests/helpers.py
+++ b/tests/unittests/helpers.py
@@ -175,6 +175,7 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
def patchOS(self, new_root):
patch_funcs = {
os.path: ['isfile', 'exists', 'islink', 'isdir'],
+ os: ['listdir'],
}
for (mod, funcs) in patch_funcs.items():
for f in funcs:
@@ -183,6 +184,7 @@ class FilesystemMockingTestCase(ResourceUsingTestCase):
setattr(mod, f, trap_func)
self.patched_funcs.append((mod, f, func))
+
def populate_dir(path, files):
os.makedirs(path)
for (name, content) in files.iteritems():
diff --git a/tests/unittests/test__init__.py b/tests/unittests/test__init__.py
index d707afa9..b4b20e51 100644
--- a/tests/unittests/test__init__.py
+++ b/tests/unittests/test__init__.py
@@ -22,7 +22,8 @@ class FakeModule(handlers.Handler):
def list_types(self):
return self.types
- def _handle_part(self, data, ctype, filename, payload, frequency):
+ def handle_part(self, _data, ctype, filename, # pylint: disable=W0221
+ payload, frequency):
pass
@@ -103,6 +104,9 @@ class TestHandlerHandlePart(MockerTestCase):
self.filename = "fake filename"
self.payload = "fake payload"
self.frequency = settings.PER_INSTANCE
+ self.headers = {
+ 'Content-Type': self.ctype,
+ }
def test_normal_version_1(self):
"""
@@ -118,8 +122,8 @@ class TestHandlerHandlePart(MockerTestCase):
self.payload)
self.mocker.replay()
- handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
- self.payload, self.frequency)
+ handlers.run_part(mod_mock, self.data, self.filename,
+ self.payload, self.frequency, self.headers)
def test_normal_version_2(self):
"""
@@ -135,8 +139,8 @@ class TestHandlerHandlePart(MockerTestCase):
self.payload, self.frequency)
self.mocker.replay()
- handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
- self.payload, self.frequency)
+ handlers.run_part(mod_mock, self.data, self.filename,
+ self.payload, self.frequency, self.headers)
def test_modfreq_per_always(self):
"""
@@ -152,8 +156,8 @@ class TestHandlerHandlePart(MockerTestCase):
self.payload)
self.mocker.replay()
- handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
- self.payload, self.frequency)
+ handlers.run_part(mod_mock, self.data, self.filename,
+ self.payload, self.frequency, self.headers)
def test_no_handle_when_modfreq_once(self):
"""C{handle_part} is not called if frequency is once."""
@@ -163,8 +167,8 @@ class TestHandlerHandlePart(MockerTestCase):
self.mocker.result(settings.PER_ONCE)
self.mocker.replay()
- handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
- self.payload, self.frequency)
+ handlers.run_part(mod_mock, self.data, self.filename,
+ self.payload, self.frequency, self.headers)
def test_exception_is_caught(self):
"""Exceptions within C{handle_part} are caught and logged."""
@@ -178,8 +182,8 @@ class TestHandlerHandlePart(MockerTestCase):
self.mocker.throw(Exception())
self.mocker.replay()
- handlers.run_part(mod_mock, self.data, self.ctype, self.filename,
- self.payload, self.frequency)
+ handlers.run_part(mod_mock, self.data, self.filename,
+ self.payload, self.frequency, self.headers)
class TestCmdlineUrl(MockerTestCase):
diff --git a/tests/unittests/test_builtin_handlers.py b/tests/unittests/test_builtin_handlers.py
index 5f41cb3d..9cf28215 100644
--- a/tests/unittests/test_builtin_handlers.py
+++ b/tests/unittests/test_builtin_handlers.py
@@ -1,8 +1,9 @@
"""Tests of the built-in user data handlers."""
import os
+import unittest
-from mocker import MockerTestCase
+from tests.unittests import helpers as test_helpers
from cloudinit import handlers
from cloudinit import helpers
@@ -13,7 +14,7 @@ from cloudinit.handlers import upstart_job
from cloudinit.settings import (PER_ALWAYS, PER_INSTANCE)
-class TestBuiltins(MockerTestCase):
+class TestBuiltins(test_helpers.FilesystemMockingTestCase):
def test_upstart_frequency_no_out(self):
c_root = self.makeDir()
@@ -34,15 +35,20 @@ class TestBuiltins(MockerTestCase):
None, None, None)
self.assertEquals(0, len(os.listdir(up_root)))
+ @unittest.skip("until LP: #1124384 fixed")
def test_upstart_frequency_single(self):
# files should be written out when frequency is ! per-instance
- c_root = self.makeDir()
- up_root = self.makeDir()
+ new_root = self.makeDir()
+ freq = PER_INSTANCE
+
+ self.patchOS(new_root)
+ self.patchUtils(new_root)
paths = helpers.Paths({
- 'cloud_dir': c_root,
- 'upstart_dir': up_root,
+ 'upstart_dir': "/etc/upstart",
})
- freq = PER_INSTANCE
+
+ util.ensure_dir("/run")
+ util.ensure_dir("/etc/upstart")
mock_subp = self.mocker.replace(util.subp, passthrough=False)
mock_subp(["initctl", "reload-configuration"], capture=False)
@@ -55,4 +61,5 @@ class TestBuiltins(MockerTestCase):
'test.conf', 'blah', freq)
h.handle_part('', handlers.CONTENT_END,
None, None, None)
- self.assertEquals(1, len(os.listdir(up_root)))
+
+ self.assertEquals(1, len(os.listdir('/etc/upstart')))
diff --git a/tests/unittests/test_datasource/test_nocloud.py b/tests/unittests/test_datasource/test_nocloud.py
index 28e0a472..62fc5358 100644
--- a/tests/unittests/test_datasource/test_nocloud.py
+++ b/tests/unittests/test_datasource/test_nocloud.py
@@ -1,7 +1,7 @@
from cloudinit import helpers
-from tests.unittests.helpers import populate_dir
from cloudinit.sources import DataSourceNoCloud
from cloudinit import util
+from tests.unittests.helpers import populate_dir
from mocker import MockerTestCase
import os
diff --git a/tests/unittests/test_handler/test_handler_growpart.py b/tests/unittests/test_handler/test_handler_growpart.py
new file mode 100644
index 00000000..b1b872b0
--- /dev/null
+++ b/tests/unittests/test_handler/test_handler_growpart.py
@@ -0,0 +1,255 @@
+from mocker import MockerTestCase
+
+from cloudinit import cloud
+from cloudinit import util
+
+from cloudinit.config import cc_growpart
+
+import errno
+import logging
+import os
+import re
+
+# growpart:
+# mode: auto # off, on, auto, 'growpart', 'parted'
+# devices: ['root']
+
+HELP_PARTED_NO_RESIZE = """
+Usage: parted [OPTION]... [DEVICE [COMMAND [PARAMETERS]...]...]
+Apply COMMANDs with PARAMETERS to DEVICE. If no COMMAND(s) are given, run in
+interactive mode.
+
+OPTIONs:
+<SNIP>
+
+COMMANDs:
+<SNIP>
+ quit exit program
+ rescue START END rescue a lost partition near START
+ and END
+ resize NUMBER START END resize partition NUMBER and its file
+ system
+ rm NUMBER delete partition NUMBER
+<SNIP>
+Report bugs to bug-parted@gnu.org
+"""
+
+HELP_PARTED_RESIZE = """
+Usage: parted [OPTION]... [DEVICE [COMMAND [PARAMETERS]...]...]
+Apply COMMANDs with PARAMETERS to DEVICE. If no COMMAND(s) are given, run in
+interactive mode.
+
+OPTIONs:
+<SNIP>
+
+COMMANDs:
+<SNIP>
+ quit exit program
+ rescue START END rescue a lost partition near START
+ and END
+ resize NUMBER START END resize partition NUMBER and its file
+ system
+ resizepart NUMBER END resize partition NUMBER
+ rm NUMBER delete partition NUMBER
+<SNIP>
+Report bugs to bug-parted@gnu.org
+"""
+
+HELP_GROWPART_RESIZE = """
+growpart disk partition
+ rewrite partition table so that partition takes up all the space it can
+ options:
+ -h | --help print Usage and exit
+<SNIP>
+ -u | --update R update the the kernel partition table info after growing
+ this requires kernel support and 'partx --update'
+ R is one of:
+ - 'auto' : [default] update partition if possible
+<SNIP>
+ Example:
+ - growpart /dev/sda 1
+ Resize partition 1 on /dev/sda
+"""
+
+HELP_GROWPART_NO_RESIZE = """
+growpart disk partition
+ rewrite partition table so that partition takes up all the space it can
+ options:
+ -h | --help print Usage and exit
+<SNIP>
+ Example:
+ - growpart /dev/sda 1
+ Resize partition 1 on /dev/sda
+"""
+
+
+class TestDisabled(MockerTestCase):
+ def setUp(self):
+ super(TestDisabled, self).setUp()
+ self.name = "growpart"
+ self.cloud_init = None
+ self.log = logging.getLogger("TestDisabled")
+ self.args = []
+
+ self.handle = cc_growpart.handle
+
+ def test_mode_off(self):
+ #Test that nothing is done if mode is off.
+
+ # this really only verifies that resizer_factory isn't called
+ config = {'growpart': {'mode': 'off'}}
+ self.mocker.replace(cc_growpart.resizer_factory,
+ passthrough=False)
+ self.mocker.replay()
+
+ self.handle(self.name, config, self.cloud_init, self.log, self.args)
+
+
+class TestConfig(MockerTestCase):
+ def setUp(self):
+ super(TestConfig, self).setUp()
+ self.name = "growpart"
+ self.paths = None
+ self.cloud = cloud.Cloud(None, self.paths, None, None, None)
+ self.log = logging.getLogger("TestConfig")
+ self.args = []
+ os.environ = {}
+
+ self.cloud_init = None
+ self.handle = cc_growpart.handle
+
+ # Order must be correct
+ self.mocker.order()
+
+ def test_no_resizers_auto_is_fine(self):
+ subp = self.mocker.replace(util.subp, passthrough=False)
+ subp(['parted', '--help'], env={'LANG': 'C'})
+ self.mocker.result((HELP_PARTED_NO_RESIZE, ""))
+ subp(['growpart', '--help'], env={'LANG': 'C'})
+ self.mocker.result((HELP_GROWPART_NO_RESIZE, ""))
+ self.mocker.replay()
+
+ config = {'growpart': {'mode': 'auto'}}
+ self.handle(self.name, config, self.cloud_init, self.log, self.args)
+
+ def test_no_resizers_mode_growpart_is_exception(self):
+ subp = self.mocker.replace(util.subp, passthrough=False)
+ subp(['growpart', '--help'], env={'LANG': 'C'})
+ self.mocker.result((HELP_GROWPART_NO_RESIZE, ""))
+ self.mocker.replay()
+
+ config = {'growpart': {'mode': "growpart"}}
+ self.assertRaises(ValueError, self.handle, self.name, config,
+ self.cloud_init, self.log, self.args)
+
+ def test_mode_auto_prefers_parted(self):
+ subp = self.mocker.replace(util.subp, passthrough=False)
+ subp(['parted', '--help'], env={'LANG': 'C'})
+ self.mocker.result((HELP_PARTED_RESIZE, ""))
+ self.mocker.replay()
+
+ ret = cc_growpart.resizer_factory(mode="auto")
+ self.assertTrue(isinstance(ret, cc_growpart.ResizeParted))
+
+ def test_handle_with_no_growpart_entry(self):
+ #if no 'growpart' entry in config, then mode=auto should be used
+
+ myresizer = object()
+
+ factory = self.mocker.replace(cc_growpart.resizer_factory,
+ passthrough=False)
+ rsdevs = self.mocker.replace(cc_growpart.resize_devices,
+ passthrough=False)
+ factory("auto")
+ self.mocker.result(myresizer)
+ rsdevs(myresizer, ["/"])
+ self.mocker.result((("/", cc_growpart.RESIZE.CHANGED, "my-message",),))
+ self.mocker.replay()
+
+ try:
+ orig_resizers = cc_growpart.RESIZERS
+ cc_growpart.RESIZERS = (('mysizer', object),)
+ self.handle(self.name, {}, self.cloud_init, self.log, self.args)
+ finally:
+ cc_growpart.RESIZERS = orig_resizers
+
+
+class TestResize(MockerTestCase):
+ def setUp(self):
+ super(TestResize, self).setUp()
+ self.name = "growpart"
+ self.log = logging.getLogger("TestResize")
+
+ # Order must be correct
+ self.mocker.order()
+
+ def test_simple_devices(self):
+ #test simple device list
+ # this patches out devent2dev, os.stat, and device_part_info
+ # so in the end, doesn't test a lot
+ devs = ["/dev/XXda1", "/dev/YYda2"]
+ devstat_ret = Bunch(st_mode=25008, st_ino=6078, st_dev=5L,
+ st_nlink=1, st_uid=0, st_gid=6, st_size=0,
+ st_atime=0, st_mtime=0, st_ctime=0)
+ enoent = ["/dev/NOENT"]
+ real_stat = os.stat
+ resize_calls = []
+
+ class myresizer(object):
+ def resize(self, diskdev, partnum, partdev):
+ resize_calls.append((diskdev, partnum, partdev))
+ if partdev == "/dev/YYda2":
+ return (1024, 2048)
+ return (1024, 1024) # old size, new size
+
+ def mystat(path):
+ if path in devs:
+ return devstat_ret
+ if path in enoent:
+ e = OSError("%s: does not exist" % path)
+ e.errno = errno.ENOENT
+ raise e
+ return real_stat(path)
+
+ try:
+ opinfo = cc_growpart.device_part_info
+ cc_growpart.device_part_info = simple_device_part_info
+ os.stat = mystat
+
+ resized = cc_growpart.resize_devices(myresizer(), devs + enoent)
+
+ def find(name, res):
+ for f in res:
+ if f[0] == name:
+ return f
+ return None
+
+ self.assertEqual(cc_growpart.RESIZE.NOCHANGE,
+ find("/dev/XXda1", resized)[1])
+ self.assertEqual(cc_growpart.RESIZE.CHANGED,
+ find("/dev/YYda2", resized)[1])
+ self.assertEqual(cc_growpart.RESIZE.SKIPPED,
+ find(enoent[0], resized)[1])
+ #self.assertEqual(resize_calls,
+ #[("/dev/XXda", "1", "/dev/XXda1"),
+ #("/dev/YYda", "2", "/dev/YYda2")])
+ finally:
+ cc_growpart.device_part_info = opinfo
+ os.stat = real_stat
+
+
+def simple_device_part_info(devpath):
+ # simple stupid return (/dev/vda, 1) for /dev/vda
+ ret = re.search("([^0-9]*)([0-9]*)$", devpath)
+ x = (ret.group(1), ret.group(2))
+ return x
+
+
+class Bunch:
+ st_mode = None # fix pylint complaint
+
+ def __init__(self, **kwds):
+ self.__dict__.update(kwds)
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_merging.py b/tests/unittests/test_merging.py
index 0037b966..ad137e85 100644
--- a/tests/unittests/test_merging.py
+++ b/tests/unittests/test_merging.py
@@ -1,62 +1,142 @@
-from mocker import MockerTestCase
-
-from cloudinit import util
-
-
-class TestMergeDict(MockerTestCase):
- def test_simple_merge(self):
- """Test simple non-conflict merge."""
- source = {"key1": "value1"}
- candidate = {"key2": "value2"}
- result = util.mergedict(source, candidate)
- self.assertEqual({"key1": "value1", "key2": "value2"}, result)
-
- def test_nested_merge(self):
- """Test nested merge."""
- source = {"key1": {"key1.1": "value1.1"}}
- candidate = {"key1": {"key1.2": "value1.2"}}
- result = util.mergedict(source, candidate)
- self.assertEqual(
- {"key1": {"key1.1": "value1.1", "key1.2": "value1.2"}}, result)
-
- def test_merge_does_not_override(self):
- """Test that candidate doesn't override source."""
- source = {"key1": "value1", "key2": "value2"}
- candidate = {"key1": "value2", "key2": "NEW VALUE"}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_empty_candidate(self):
- """Test empty candidate doesn't change source."""
- source = {"key": "value"}
- candidate = {}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_empty_source(self):
- """Test empty source is replaced by candidate."""
- source = {}
- candidate = {"key": "value"}
- result = util.mergedict(source, candidate)
- self.assertEqual(candidate, result)
-
- def test_non_dict_candidate(self):
- """Test non-dict candidate is discarded."""
- source = {"key": "value"}
- candidate = "not a dict"
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_non_dict_source(self):
- """Test non-dict source is not modified with a dict candidate."""
- source = "not a dict"
- candidate = {"key": "value"}
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
-
- def test_neither_dict(self):
- """Test if neither candidate or source is dict source wins."""
- source = "source"
- candidate = "candidate"
- result = util.mergedict(source, candidate)
- self.assertEqual(source, result)
+from tests.unittests import helpers
+
+from cloudinit import mergers
+
+
+class TestSimpleRun(helpers.MockerTestCase):
+ def test_basic_merge(self):
+ source = {
+ 'Blah': ['blah2'],
+ 'Blah3': 'c',
+ }
+ merge_with = {
+ 'Blah2': ['blah3'],
+ 'Blah3': 'b',
+ 'Blah': ['123'],
+ }
+ # Basic merge should not do thing special
+ merge_how = "list()+dict()+str()"
+ merger_set = mergers.string_extract_mergers(merge_how)
+ self.assertEquals(3, len(merger_set))
+ merger = mergers.construct(merger_set)
+ merged = merger.merge(source, merge_with)
+ self.assertEquals(merged['Blah'], ['blah2'])
+ self.assertEquals(merged['Blah2'], ['blah3'])
+ self.assertEquals(merged['Blah3'], 'c')
+
+ def test_dict_overwrite(self):
+ source = {
+ 'Blah': ['blah2'],
+ }
+ merge_with = {
+ 'Blah': ['123'],
+ }
+ # Now lets try a dict overwrite
+ merge_how = "list()+dict(overwrite)+str()"
+ merger_set = mergers.string_extract_mergers(merge_how)
+ self.assertEquals(3, len(merger_set))
+ merger = mergers.construct(merger_set)
+ merged = merger.merge(source, merge_with)
+ self.assertEquals(merged['Blah'], ['123'])
+
+ def test_string_append(self):
+ source = {
+ 'Blah': 'blah2',
+ }
+ merge_with = {
+ 'Blah': '345',
+ }
+ merge_how = "list()+dict()+str(append)"
+ merger_set = mergers.string_extract_mergers(merge_how)
+ self.assertEquals(3, len(merger_set))
+ merger = mergers.construct(merger_set)
+ merged = merger.merge(source, merge_with)
+ self.assertEquals(merged['Blah'], 'blah2345')
+
+ def test_list_extend(self):
+ source = ['abc']
+ merge_with = ['123']
+ merge_how = "list(extend)+dict()+str()"
+ merger_set = mergers.string_extract_mergers(merge_how)
+ self.assertEquals(3, len(merger_set))
+ merger = mergers.construct(merger_set)
+ merged = merger.merge(source, merge_with)
+ self.assertEquals(merged, ['abc', '123'])
+
+ def test_deep_merge(self):
+ source = {
+ 'a': [1, 'b', 2],
+ 'b': 'blahblah',
+ 'c': {
+ 'e': [1, 2, 3],
+ 'f': 'bigblobof',
+ 'iamadict': {
+ 'ok': 'ok',
+ }
+ },
+ 'run': [
+ 'runme',
+ 'runme2',
+ ],
+ 'runmereally': [
+ 'e', ['a'], 'd',
+ ],
+ }
+ merge_with = {
+ 'a': ['e', 'f', 'g'],
+ 'b': 'more',
+ 'c': {
+ 'a': 'b',
+ 'f': 'stuff',
+ },
+ 'run': [
+ 'morecmd',
+ 'moremoremore',
+ ],
+ 'runmereally': [
+ 'blah', ['b'], 'e',
+ ],
+ }
+ merge_how = "list(extend)+dict()+str(append)"
+ merger_set = mergers.string_extract_mergers(merge_how)
+ self.assertEquals(3, len(merger_set))
+ merger = mergers.construct(merger_set)
+ merged = merger.merge(source, merge_with)
+ self.assertEquals(merged['a'], [1, 'b', 2, 'e', 'f', 'g'])
+ self.assertEquals(merged['b'], 'blahblahmore')
+ self.assertEquals(merged['c']['f'], 'bigblobofstuff')
+ self.assertEquals(merged['run'], ['runme', 'runme2', 'morecmd',
+ 'moremoremore'])
+ self.assertEquals(merged['runmereally'], ['e', ['a'], 'd', 'blah',
+ ['b'], 'e'])
+
+ def test_dict_overwrite_layered(self):
+ source = {
+ 'Blah3': {
+ 'f': '3',
+ 'g': {
+ 'a': 'b',
+ }
+ }
+ }
+ merge_with = {
+ 'Blah3': {
+ 'e': '2',
+ 'g': {
+ 'e': 'f',
+ }
+ }
+ }
+ merge_how = "list()+dict()+str()"
+ merger_set = mergers.string_extract_mergers(merge_how)
+ self.assertEquals(3, len(merger_set))
+ merger = mergers.construct(merger_set)
+ merged = merger.merge(source, merge_with)
+ self.assertEquals(merged['Blah3'], {
+ 'e': '2',
+ 'f': '3',
+ 'g': {
+ 'a': 'b',
+ 'e': 'f',
+ }
+ })
diff --git a/tests/unittests/test_sshutil.py b/tests/unittests/test_sshutil.py
new file mode 100644
index 00000000..d8662cac
--- /dev/null
+++ b/tests/unittests/test_sshutil.py
@@ -0,0 +1,101 @@
+from cloudinit import ssh_util
+from unittest import TestCase
+
+
+VALID_CONTENT = {
+ 'dsa': (
+ "AAAAB3NzaC1kc3MAAACBAIrjOQSlSea19bExXBMBKBvcLhBoVvNBjCppNzllipF"
+ "W4jgIOMcNanULRrZGjkOKat6MWJNetSbV1E6IOFDQ16rQgsh/OvYU9XhzM8seLa"
+ "A21VszZuhIV7/2DE3vxu7B54zVzueG1O1Deq6goQCRGWBUnqO2yluJiG4HzrnDa"
+ "jzRAAAAFQDMPO96qXd4F5A+5b2f2MO7SpVomQAAAIBpC3K2zIbDLqBBs1fn7rsv"
+ "KcJvwihdlVjG7UXsDB76P2GNqVG+IlYPpJZ8TO/B/fzTMtrdXp9pSm9OY1+BgN4"
+ "REsZ2WNcvfgY33aWaEM+ieCcQigvxrNAF2FTVcbUIIxAn6SmHuQSWrLSfdHc8H7"
+ "hsrgeUPPdzjBD/cv2ZmqwZ1AAAAIAplIsScrJut5wJMgyK1JG0Kbw9JYQpLe95P"
+ "obB069g8+mYR8U0fysmTEdR44mMu0VNU5E5OhTYoTGfXrVrkR134LqFM2zpVVbE"
+ "JNDnIqDHxTkc6LY2vu8Y2pQ3/bVnllZZOda2oD5HQ7ovygQa6CH+fbaZHbdDUX/"
+ "5z7u2rVAlDw=="
+ ),
+ 'ecdsa': (
+ "AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBITrGBB3cgJ"
+ "J7fPxvtMW9H3oRisNpJ3OAslxZeyP7I0A9BPAW0RQIwHVtVnM7zrp4nI+JLZov/"
+ "Ql7lc2leWL7CY="
+ ),
+ 'rsa': (
+ "AAAAB3NzaC1yc2EAAAABIwAAAQEA3I7VUf2l5gSn5uavROsc5HRDpZdQueUq5oz"
+ "emNSj8T7enqKHOEaFoU2VoPgGEWC9RyzSQVeyD6s7APMcE82EtmW4skVEgEGSbD"
+ "c1pvxzxtchBj78hJP6Cf5TCMFSXw+Fz5rF1dR23QDbN1mkHs7adr8GW4kSWqU7Q"
+ "7NDwfIrJJtO7Hi42GyXtvEONHbiRPOe8stqUly7MvUoN+5kfjBM8Qqpfl2+FNhT"
+ "YWpMfYdPUnE7u536WqzFmsaqJctz3gBxH9Ex7dFtrxR4qiqEr9Qtlu3xGn7Bw07"
+ "/+i1D+ey3ONkZLN+LQ714cgj8fRS4Hj29SCmXp5Kt5/82cD/VN3NtHw=="
+ ),
+}
+
+TEST_OPTIONS = ("no-port-forwarding,no-agent-forwarding,no-X11-forwarding,"
+ 'command="echo \'Please login as the user \"ubuntu\" rather than the'
+ 'user \"root\".\';echo;sleep 10"')
+
+
+class TestAuthKeyLineParser(TestCase):
+ def test_simple_parse(self):
+ # test key line with common 3 fields (keytype, base64, comment)
+ parser = ssh_util.AuthKeyLineParser()
+ for ktype in ['rsa', 'ecdsa', 'dsa']:
+ content = VALID_CONTENT[ktype]
+ comment = 'user-%s@host' % ktype
+ line = ' '.join((ktype, content, comment,))
+ key = parser.parse(line)
+
+ self.assertEqual(key.base64, content)
+ self.assertFalse(key.options)
+ self.assertEqual(key.comment, comment)
+ self.assertEqual(key.keytype, ktype)
+
+ def test_parse_no_comment(self):
+ # test key line with key type and base64 only
+ parser = ssh_util.AuthKeyLineParser()
+ for ktype in ['rsa', 'ecdsa', 'dsa']:
+ content = VALID_CONTENT[ktype]
+ line = ' '.join((ktype, content,))
+ key = parser.parse(line)
+
+ self.assertEqual(key.base64, content)
+ self.assertFalse(key.options)
+ self.assertFalse(key.comment)
+ self.assertEqual(key.keytype, ktype)
+
+ def test_parse_with_keyoptions(self):
+ # test key line with options in it
+ parser = ssh_util.AuthKeyLineParser()
+ options = TEST_OPTIONS
+ for ktype in ['rsa', 'ecdsa', 'dsa']:
+ content = VALID_CONTENT[ktype]
+ comment = 'user-%s@host' % ktype
+ line = ' '.join((options, ktype, content, comment,))
+ key = parser.parse(line)
+
+ self.assertEqual(key.base64, content)
+ self.assertEqual(key.options, options)
+ self.assertEqual(key.comment, comment)
+ self.assertEqual(key.keytype, ktype)
+
+ def test_parse_with_options_passed_in(self):
+ # test key line with key type and base64 only
+ parser = ssh_util.AuthKeyLineParser()
+
+ baseline = ' '.join(("rsa", VALID_CONTENT['rsa'], "user@host"))
+ myopts = "no-port-forwarding,no-agent-forwarding"
+
+ key = parser.parse("allowedopt" + " " + baseline)
+ self.assertEqual(key.options, "allowedopt")
+
+ key = parser.parse("overridden_opt " + baseline, options=myopts)
+ self.assertEqual(key.options, myopts)
+
+ def test_parse_invalid_keytype(self):
+ parser = ssh_util.AuthKeyLineParser()
+ key = parser.parse(' '.join(["badkeytype", VALID_CONTENT['rsa']]))
+
+ self.assertFalse(key.valid())
+
+
+# vi: ts=4 expandtab
diff --git a/tests/unittests/test_userdata.py b/tests/unittests/test_userdata.py
index 82a4c555..fdfe2542 100644
--- a/tests/unittests/test_userdata.py
+++ b/tests/unittests/test_userdata.py
@@ -7,14 +7,17 @@ import os
from email.mime.base import MIMEBase
-from mocker import MockerTestCase
-
+from cloudinit import handlers
+from cloudinit import helpers as c_helpers
from cloudinit import log
from cloudinit import sources
from cloudinit import stages
+from cloudinit import util
INSTANCE_ID = "i-testing"
+from tests.unittests import helpers
+
class FakeDataSource(sources.DataSource):
@@ -26,22 +29,16 @@ class FakeDataSource(sources.DataSource):
# FIXME: these tests shouldn't be checking log output??
# Weirddddd...
-
-
-class TestConsumeUserData(MockerTestCase):
+class TestConsumeUserData(helpers.FilesystemMockingTestCase):
def setUp(self):
- MockerTestCase.setUp(self)
- # Replace the write so no actual files
- # get written out...
- self.mock_write = self.mocker.replace("cloudinit.util.write_file",
- passthrough=False)
+ helpers.FilesystemMockingTestCase.setUp(self)
self._log = None
self._log_file = None
self._log_handler = None
def tearDown(self):
- MockerTestCase.tearDown(self)
+ helpers.FilesystemMockingTestCase.tearDown(self)
if self._log_handler and self._log:
self._log.removeHandler(self._log_handler)
@@ -53,13 +50,77 @@ class TestConsumeUserData(MockerTestCase):
self._log.addHandler(self._log_handler)
return log_file
+ def test_merging_cloud_config(self):
+ blob = '''
+#cloud-config
+a: b
+e: f
+run:
+ - b
+ - c
+'''
+ message1 = MIMEBase("text", "cloud-config")
+ message1['Merge-Type'] = 'dict()+list(extend)+str(append)'
+ message1.set_payload(blob)
+
+ blob2 = '''
+#cloud-config
+a: e
+e: g
+run:
+ - stuff
+ - morestuff
+'''
+ message2 = MIMEBase("text", "cloud-config")
+ message2['X-Merge-Type'] = 'dict()+list(extend)+str()'
+ message2.set_payload(blob2)
+
+ blob3 = '''
+#cloud-config
+e:
+ - 1
+ - 2
+ - 3
+p: 1
+'''
+ message3 = MIMEBase("text", "cloud-config")
+ message3['Merge-Type'] = 'dict()+list()+str()'
+ message3.set_payload(blob3)
+
+ messages = [message1, message2, message3]
+
+ paths = c_helpers.Paths({}, ds=FakeDataSource(''))
+ cloud_cfg = handlers.cloud_config.CloudConfigPartHandler(paths)
+
+ new_root = self.makeDir()
+ self.patchUtils(new_root)
+ self.patchOS(new_root)
+ cloud_cfg.handle_part(None, handlers.CONTENT_START, None, None, None,
+ None)
+ for i, m in enumerate(messages):
+ headers = dict(m)
+ fn = "part-%s" % (i + 1)
+ payload = m.get_payload(decode=True)
+ cloud_cfg.handle_part(None, headers['Content-Type'],
+ fn, payload, None, headers)
+ cloud_cfg.handle_part(None, handlers.CONTENT_END, None, None, None,
+ None)
+ contents = util.load_file(paths.get_ipath('cloud_config'))
+ contents = util.load_yaml(contents)
+ self.assertEquals(contents['run'], ['b', 'c', 'stuff', 'morestuff'])
+ self.assertEquals(contents['a'], 'be')
+ self.assertEquals(contents['e'], 'fg')
+ self.assertEquals(contents['p'], 1)
+
def test_unhandled_type_warning(self):
"""Raw text without magic is ignored but shows warning."""
ci = stages.Init()
data = "arbitrary text\n"
ci.datasource = FakeDataSource(data)
- self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ mock_write = self.mocker.replace("cloudinit.util.write_file",
+ passthrough=False)
+ mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mocker.replay()
log_file = self.capture_log(logging.WARNING)
@@ -76,7 +137,9 @@ class TestConsumeUserData(MockerTestCase):
message.set_payload("Just text")
ci.datasource = FakeDataSource(message.as_string())
- self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ mock_write = self.mocker.replace("cloudinit.util.write_file",
+ passthrough=False)
+ mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mocker.replay()
log_file = self.capture_log(logging.WARNING)
@@ -93,8 +156,10 @@ class TestConsumeUserData(MockerTestCase):
ci.datasource = FakeDataSource(script)
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- self.mock_write(outpath, script, 0700)
+ mock_write = self.mocker.replace("cloudinit.util.write_file",
+ passthrough=False)
+ mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ mock_write(outpath, script, 0700)
self.mocker.replay()
log_file = self.capture_log(logging.WARNING)
@@ -111,8 +176,10 @@ class TestConsumeUserData(MockerTestCase):
ci.datasource = FakeDataSource(message.as_string())
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
- self.mock_write(outpath, script, 0700)
+ mock_write = self.mocker.replace("cloudinit.util.write_file",
+ passthrough=False)
+ mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ mock_write(outpath, script, 0700)
self.mocker.replay()
log_file = self.capture_log(logging.WARNING)
@@ -129,8 +196,10 @@ class TestConsumeUserData(MockerTestCase):
ci.datasource = FakeDataSource(message.as_string())
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
- self.mock_write(outpath, script, 0700)
- self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
+ mock_write = self.mocker.replace("cloudinit.util.write_file",
+ passthrough=False)
+ mock_write(outpath, script, 0700)
+ mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
self.mocker.replay()
log_file = self.capture_log(logging.WARNING)
diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py
index 02611581..7ff9a57f 100644
--- a/tests/unittests/test_util.py
+++ b/tests/unittests/test_util.py
@@ -248,4 +248,99 @@ class TestLoadYaml(TestCase):
myobj)
+class TestMountinfoParsing(TestCase):
+ precise_ext4_mountinfo = \
+"""15 20 0:14 / /sys rw,nosuid,nodev,noexec,relatime - sysfs sysfs rw
+16 20 0:3 / /proc rw,nosuid,nodev,noexec,relatime - proc proc rw
+17 20 0:5 / /dev rw,relatime - devtmpfs udev rw,size=16422216k,nr_inodes=4105554,mode=755
+18 17 0:11 / /dev/pts rw,nosuid,noexec,relatime - devpts devpts rw,gid=5,mode=620,ptmxmode=000
+19 20 0:15 / /run rw,nosuid,relatime - tmpfs tmpfs rw,size=6572812k,mode=755
+20 1 252:1 / / rw,relatime - ext4 /dev/mapper/vg0-root rw,errors=remount-ro,data=ordered
+21 15 0:16 / /sys/fs/cgroup rw,relatime - tmpfs cgroup rw,mode=755
+22 15 0:17 / /sys/fs/fuse/connections rw,relatime - fusectl none rw
+23 15 0:6 / /sys/kernel/debug rw,relatime - debugfs none rw
+25 15 0:10 / /sys/kernel/security rw,relatime - securityfs none rw
+26 19 0:19 / /run/lock rw,nosuid,nodev,noexec,relatime - tmpfs none rw,size=5120k
+27 19 0:20 / /run/shm rw,nosuid,nodev,relatime - tmpfs none rw
+28 19 0:21 / /run/user rw,nosuid,nodev,noexec,relatime - tmpfs none rw,size=102400k,mode=755
+24 21 0:18 / /sys/fs/cgroup/cpuset rw,relatime - cgroup cgroup rw,cpuset
+29 21 0:22 / /sys/fs/cgroup/cpu rw,relatime - cgroup cgroup rw,cpu
+30 21 0:23 / /sys/fs/cgroup/cpuacct rw,relatime - cgroup cgroup rw,cpuacct
+31 21 0:24 / /sys/fs/cgroup/memory rw,relatime - cgroup cgroup rw,memory
+32 21 0:25 / /sys/fs/cgroup/devices rw,relatime - cgroup cgroup rw,devices
+33 21 0:26 / /sys/fs/cgroup/freezer rw,relatime - cgroup cgroup rw,freezer
+34 21 0:27 / /sys/fs/cgroup/blkio rw,relatime - cgroup cgroup rw,blkio
+35 21 0:28 / /sys/fs/cgroup/perf_event rw,relatime - cgroup cgroup rw,perf_event
+36 20 9:0 / /boot rw,relatime - ext4 /dev/md0 rw,data=ordered
+37 16 0:29 / /proc/sys/fs/binfmt_misc rw,nosuid,nodev,noexec,relatime - binfmt_misc binfmt_misc rw
+39 28 0:30 / /run/user/foobar/gvfs rw,nosuid,nodev,relatime - fuse.gvfsd-fuse gvfsd-fuse rw,user_id=1000,group_id=1000"""
+
+ raring_btrfs_mountinfo = \
+"""15 20 0:14 / /sys rw,nosuid,nodev,noexec,relatime - sysfs sysfs rw
+16 20 0:3 / /proc rw,nosuid,nodev,noexec,relatime - proc proc rw
+17 20 0:5 / /dev rw,relatime - devtmpfs udev rw,size=865556k,nr_inodes=216389,mode=755
+18 17 0:11 / /dev/pts rw,nosuid,noexec,relatime - devpts devpts rw,gid=5,mode=620,ptmxmode=000
+19 20 0:15 / /run rw,nosuid,relatime - tmpfs tmpfs rw,size=348196k,mode=755
+20 1 0:16 /@ / rw,relatime - btrfs /dev/vda1 rw,compress=lzo,space_cache
+21 15 0:19 / /sys/fs/fuse/connections rw,relatime - fusectl none rw
+22 15 0:6 / /sys/kernel/debug rw,relatime - debugfs none rw
+23 15 0:10 / /sys/kernel/security rw,relatime - securityfs none rw
+24 19 0:20 / /run/lock rw,nosuid,nodev,noexec,relatime - tmpfs none rw,size=5120k
+25 19 0:21 / /run/shm rw,nosuid,nodev,relatime - tmpfs none rw
+26 19 0:22 / /run/user rw,nosuid,nodev,noexec,relatime - tmpfs none rw,size=102400k,mode=755
+27 20 0:16 /@home /home rw,relatime - btrfs /dev/vda1 rw,compress=lzo,space_cache"""
+
+ def test_invalid_mountinfo(self):
+ line = "20 1 252:1 / / rw,relatime - ext4 /dev/mapper/vg0-root rw,errors=remount-ro,data=ordered"
+ elements = line.split()
+ for i in range(len(elements) + 1):
+ lines = [' '.join(elements[0:i])]
+ if i < 10:
+ expected = None
+ else:
+ expected = ('/dev/mapper/vg0-root', 'ext4', '/')
+ self.assertEqual(expected, util.parse_mount_info('/', lines))
+
+ def test_precise_ext4_root(self):
+ lines = TestMountinfoParsing.precise_ext4_mountinfo.splitlines()
+
+ expected = ('/dev/mapper/vg0-root', 'ext4', '/')
+ self.assertEqual(expected, util.parse_mount_info('/', lines))
+ self.assertEqual(expected, util.parse_mount_info('/usr', lines))
+ self.assertEqual(expected, util.parse_mount_info('/usr/bin', lines))
+
+ expected = ('/dev/md0', 'ext4', '/boot')
+ self.assertEqual(expected, util.parse_mount_info('/boot', lines))
+ self.assertEqual(expected, util.parse_mount_info('/boot/grub', lines))
+
+ expected = ('/dev/mapper/vg0-root', 'ext4', '/')
+ self.assertEqual(expected, util.parse_mount_info('/home', lines))
+ self.assertEqual(expected, util.parse_mount_info('/home/me', lines))
+
+ expected = ('tmpfs', 'tmpfs', '/run')
+ self.assertEqual(expected, util.parse_mount_info('/run', lines))
+
+ expected = ('none', 'tmpfs', '/run/lock')
+ self.assertEqual(expected, util.parse_mount_info('/run/lock', lines))
+
+ def test_raring_btrfs_root(self):
+ lines = TestMountinfoParsing.raring_btrfs_mountinfo.splitlines()
+
+ expected = ('/dev/vda1', 'btrfs', '/')
+ self.assertEqual(expected, util.parse_mount_info('/', lines))
+ self.assertEqual(expected, util.parse_mount_info('/usr', lines))
+ self.assertEqual(expected, util.parse_mount_info('/usr/bin', lines))
+ self.assertEqual(expected, util.parse_mount_info('/boot', lines))
+ self.assertEqual(expected, util.parse_mount_info('/boot/grub', lines))
+
+ expected = ('/dev/vda1', 'btrfs', '/home')
+ self.assertEqual(expected, util.parse_mount_info('/home', lines))
+ self.assertEqual(expected, util.parse_mount_info('/home/me', lines))
+
+ expected = ('tmpfs', 'tmpfs', '/run')
+ self.assertEqual(expected, util.parse_mount_info('/run', lines))
+
+ expected = ('none', 'tmpfs', '/run/lock')
+ self.assertEqual(expected, util.parse_mount_info('/run/lock', lines))
+
# vi: ts=4 expandtab