summaryrefslogtreecommitdiff
path: root/tests/unittests/filters
diff options
context:
space:
mode:
authorBrett Holman <bholman.devel@gmail.com>2021-12-03 13:11:46 -0700
committerGitHub <noreply@github.com>2021-12-03 13:11:46 -0700
commit039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51 (patch)
tree5f1b09486ccaf98ee8159de58d9a2a1ef0af5dc1 /tests/unittests/filters
parentffa6fc88249aa080aa31811a45569a45e567418a (diff)
downloadvyos-cloud-init-039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51.tar.gz
vyos-cloud-init-039c40f9b3d88ee8158604bb18ca4bf2fb5d5e51.zip
Reorganize unit test locations under tests/unittests (#1126)
This attempts to standardize unit test file location under test/unittests/ such that any source file located at cloudinit/path/to/file.py may have a corresponding unit test file at test/unittests/path/to/test_file.py. Noteworthy Comments: ==================== Four different duplicate test files existed: test_{gpg,util,cc_mounts,cc_resolv_conf}.py Each of these duplicate file pairs has been merged together. This is a break in git history for these files. The test suite appears to have a dependency on test order. Changing test order causes some tests to fail. This should be rectified, but for now some tests have been modified in tests/unittests/config/test_set_passwords.py. A helper class name starts with "Test" which causes pytest to try executing it as a test case, which then throws warnings "due to Class having __init__()". Silence by changing the name of the class. # helpers.py is imported in many test files, import paths change cloudinit/tests/helpers.py -> tests/unittests/helpers.py # Move directories: cloudinit/distros/tests -> tests/unittests/distros cloudinit/cmd/devel/tests -> tests/unittests/cmd/devel cloudinit/cmd/tests -> tests/unittests/cmd/ cloudinit/sources/helpers/tests -> tests/unittests/sources/helpers cloudinit/sources/tests -> tests/unittests/sources cloudinit/net/tests -> tests/unittests/net cloudinit/config/tests -> tests/unittests/config cloudinit/analyze/tests/ -> tests/unittests/analyze/ # Standardize tests already in tests/unittests/ test_datasource -> sources test_distros -> distros test_vmware -> sources/vmware test_handler -> config # this contains cloudconfig module tests test_runs -> runs
Diffstat (limited to 'tests/unittests/filters')
-rw-r--r--tests/unittests/filters/__init__.py0
-rw-r--r--tests/unittests/filters/test_launch_index.py135
2 files changed, 135 insertions, 0 deletions
diff --git a/tests/unittests/filters/__init__.py b/tests/unittests/filters/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/tests/unittests/filters/__init__.py
diff --git a/tests/unittests/filters/test_launch_index.py b/tests/unittests/filters/test_launch_index.py
new file mode 100644
index 00000000..0b1a7067
--- /dev/null
+++ b/tests/unittests/filters/test_launch_index.py
@@ -0,0 +1,135 @@
+# This file is part of cloud-init. See LICENSE file for license information.
+
+import copy
+from itertools import filterfalse
+
+from tests.unittests import helpers
+
+from cloudinit.filters import launch_index
+from cloudinit import user_data as ud
+from cloudinit import util
+
+
+def count_messages(root):
+ am = 0
+ for m in root.walk():
+ if ud.is_skippable(m):
+ continue
+ am += 1
+ return am
+
+
+class TestLaunchFilter(helpers.ResourceUsingTestCase):
+
+ def assertCounts(self, message, expected_counts):
+ orig_message = copy.deepcopy(message)
+ for (index, count) in expected_counts.items():
+ index = util.safe_int(index)
+ filtered_message = launch_index.Filter(index).apply(message)
+ self.assertEqual(count_messages(filtered_message), count)
+ # Ensure original message still ok/not modified
+ self.assertTrue(self.equivalentMessage(message, orig_message))
+
+ def equivalentMessage(self, msg1, msg2):
+ msg1_count = count_messages(msg1)
+ msg2_count = count_messages(msg2)
+ if msg1_count != msg2_count:
+ return False
+ # Do some basic payload checking
+ msg1_msgs = [m for m in msg1.walk()]
+ msg1_msgs = [m for m in filterfalse(ud.is_skippable, msg1_msgs)]
+ msg2_msgs = [m for m in msg2.walk()]
+ msg2_msgs = [m for m in filterfalse(ud.is_skippable, msg2_msgs)]
+ for i in range(0, len(msg2_msgs)):
+ m1_msg = msg1_msgs[i]
+ m2_msg = msg2_msgs[i]
+ if m1_msg.get_charset() != m2_msg.get_charset():
+ return False
+ if m1_msg.is_multipart() != m2_msg.is_multipart():
+ return False
+ m1_py = m1_msg.get_payload(decode=True)
+ m2_py = m2_msg.get_payload(decode=True)
+ if m1_py != m2_py:
+ return False
+ return True
+
+ def testMultiEmailIndex(self):
+ test_data = helpers.readResource('filter_cloud_multipart_2.email')
+ ud_proc = ud.UserDataProcessor(self.getCloudPaths())
+ message = ud_proc.process(test_data)
+ self.assertTrue(count_messages(message) > 0)
+ # This file should have the following
+ # indexes -> amount mapping in it
+ expected_counts = {
+ 3: 1,
+ 2: 2,
+ None: 3,
+ -1: 0,
+ }
+ self.assertCounts(message, expected_counts)
+
+ def testHeaderEmailIndex(self):
+ test_data = helpers.readResource('filter_cloud_multipart_header.email')
+ ud_proc = ud.UserDataProcessor(self.getCloudPaths())
+ message = ud_proc.process(test_data)
+ self.assertTrue(count_messages(message) > 0)
+ # This file should have the following
+ # indexes -> amount mapping in it
+ expected_counts = {
+ 5: 1,
+ -1: 0,
+ 'c': 1,
+ None: 1,
+ }
+ self.assertCounts(message, expected_counts)
+
+ def testConfigEmailIndex(self):
+ test_data = helpers.readResource('filter_cloud_multipart_1.email')
+ ud_proc = ud.UserDataProcessor(self.getCloudPaths())
+ message = ud_proc.process(test_data)
+ self.assertTrue(count_messages(message) > 0)
+ # This file should have the following
+ # indexes -> amount mapping in it
+ expected_counts = {
+ 2: 1,
+ -1: 0,
+ None: 1,
+ }
+ self.assertCounts(message, expected_counts)
+
+ def testNoneIndex(self):
+ test_data = helpers.readResource('filter_cloud_multipart.yaml')
+ ud_proc = ud.UserDataProcessor(self.getCloudPaths())
+ message = ud_proc.process(test_data)
+ start_count = count_messages(message)
+ self.assertTrue(start_count > 0)
+ filtered_message = launch_index.Filter(None).apply(message)
+ self.assertTrue(self.equivalentMessage(message, filtered_message))
+
+ def testIndexes(self):
+ test_data = helpers.readResource('filter_cloud_multipart.yaml')
+ ud_proc = ud.UserDataProcessor(self.getCloudPaths())
+ message = ud_proc.process(test_data)
+ start_count = count_messages(message)
+ self.assertTrue(start_count > 0)
+ # This file should have the following
+ # indexes -> amount mapping in it
+ expected_counts = {
+ 2: 2,
+ 3: 2,
+ 1: 2,
+ 0: 1,
+ 4: 1,
+ 7: 0,
+ -1: 0,
+ 100: 0,
+ # None should just give all back
+ None: start_count,
+ # Non ints should be ignored
+ 'c': start_count,
+ # Strings should be converted
+ '1': 2,
+ }
+ self.assertCounts(message, expected_counts)
+
+# vi: ts=4 expandtab