diff options
Diffstat (limited to 'tests/unittests')
-rw-r--r-- | tests/unittests/helpers.py | 42 | ||||
-rw-r--r-- | tests/unittests/test_filters/test_launch_index.py | 134 |
2 files changed, 176 insertions, 0 deletions
diff --git a/tests/unittests/helpers.py b/tests/unittests/helpers.py new file mode 100644 index 00000000..d0f09e70 --- /dev/null +++ b/tests/unittests/helpers.py @@ -0,0 +1,42 @@ +import os + +from mocker import MockerTestCase + +from cloudinit import helpers as ch + + +class ResourceUsingTestCase(MockerTestCase): + def __init__(self, methodName="runTest"): + MockerTestCase.__init__(self, methodName) + self.resource_path = None + + def resourceLocation(self, subname=None): + if self.resource_path is None: + paths = [ + os.path.join('tests', 'data'), + os.path.join('data'), + os.path.join(os.pardir, 'tests', 'data'), + os.path.join(os.pardir, 'data'), + ] + for p in paths: + if os.path.isdir(p): + self.resource_path = p + break + self.assertTrue((self.resource_path and + os.path.isdir(self.resource_path)), + msg="Unable to locate test resource data path!") + if not subname: + return self.resource_path + return os.path.join(self.resource_path, subname) + + def readResource(self, name): + where = self.resourceLocation(name) + with open(where, 'r') as fh: + return fh.read() + + def getCloudPaths(self): + cp = ch.Paths({ + 'cloud_dir': self.makeDir(), + 'templates_dir': self.resourceLocation(), + }) + return cp diff --git a/tests/unittests/test_filters/test_launch_index.py b/tests/unittests/test_filters/test_launch_index.py new file mode 100644 index 00000000..7ca7cbb6 --- /dev/null +++ b/tests/unittests/test_filters/test_launch_index.py @@ -0,0 +1,134 @@ +import copy + +import helpers as th + +import itertools + +from cloudinit.filters import launch_index +from cloudinit import user_data as ud +from cloudinit import util + + +def count_messages(root): + am = 0 + for m in root.walk(): + if ud.is_skippable(m): + continue + am += 1 + return am + + +class TestLaunchFilter(th.ResourceUsingTestCase): + + def assertCounts(self, message, expected_counts): + orig_message = copy.deepcopy(message) + for (index, count) in expected_counts.items(): + index = util.safe_int(index) + filtered_message = launch_index.Filter(index).apply(message) + self.assertEquals(count_messages(filtered_message), count) + # Ensure original message still ok/not modified + self.assertTrue(self.equivalentMessage(message, orig_message)) + + def equivalentMessage(self, msg1, msg2): + msg1_count = count_messages(msg1) + msg2_count = count_messages(msg2) + if msg1_count != msg2_count: + return False + # Do some basic payload checking + msg1_msgs = [m for m in msg1.walk()] + msg1_msgs = [m for m in + itertools.ifilterfalse(ud.is_skippable, msg1_msgs)] + msg2_msgs = [m for m in msg2.walk()] + msg2_msgs = [m for m in + itertools.ifilterfalse(ud.is_skippable, msg2_msgs)] + for i in range(0, len(msg2_msgs)): + m1_msg = msg1_msgs[i] + m2_msg = msg2_msgs[i] + if m1_msg.get_charset() != m2_msg.get_charset(): + return False + if m1_msg.is_multipart() != m2_msg.is_multipart(): + return False + m1_py = m1_msg.get_payload(decode=True) + m2_py = m2_msg.get_payload(decode=True) + if m1_py != m2_py: + return False + return True + + def testMultiEmailIndex(self): + test_data = self.readResource('filter_cloud_multipart_2.email') + ud_proc = ud.UserDataProcessor(self.getCloudPaths()) + message = ud_proc.process(test_data) + self.assertTrue(count_messages(message) > 0) + # This file should have the following + # indexes -> amount mapping in it + expected_counts = { + 3: 1, + 2: 2, + None: 3, + -1: 0, + } + self.assertCounts(message, expected_counts) + + def testHeaderEmailIndex(self): + test_data = self.readResource('filter_cloud_multipart_header.email') + ud_proc = ud.UserDataProcessor(self.getCloudPaths()) + message = ud_proc.process(test_data) + self.assertTrue(count_messages(message) > 0) + # This file should have the following + # indexes -> amount mapping in it + expected_counts = { + 5: 1, + -1: 0, + 'c': 1, + None: 1, + } + self.assertCounts(message, expected_counts) + + def testConfigEmailIndex(self): + test_data = self.readResource('filter_cloud_multipart_1.email') + ud_proc = ud.UserDataProcessor(self.getCloudPaths()) + message = ud_proc.process(test_data) + self.assertTrue(count_messages(message) > 0) + # This file should have the following + # indexes -> amount mapping in it + expected_counts = { + 2: 1, + -1: 0, + None: 1, + } + self.assertCounts(message, expected_counts) + + def testNoneIndex(self): + test_data = self.readResource('filter_cloud_multipart.yaml') + ud_proc = ud.UserDataProcessor(self.getCloudPaths()) + message = ud_proc.process(test_data) + start_count = count_messages(message) + self.assertTrue(start_count > 0) + filtered_message = launch_index.Filter(None).apply(message) + self.assertTrue(self.equivalentMessage(message, filtered_message)) + + def testIndexes(self): + test_data = self.readResource('filter_cloud_multipart.yaml') + ud_proc = ud.UserDataProcessor(self.getCloudPaths()) + message = ud_proc.process(test_data) + start_count = count_messages(message) + self.assertTrue(start_count > 0) + # This file should have the following + # indexes -> amount mapping in it + expected_counts = { + 2: 2, + 3: 2, + 1: 2, + 0: 1, + 4: 1, + 7: 0, + -1: 0, + 100: 0, + # None should just give all back + None: start_count, + # Non ints should be ignored + 'c': start_count, + # Strings should be converted + '1': 2, + } + self.assertCounts(message, expected_counts) |