diff options
author | Ćukasz 'sil2100' Zemczak <lukasz.zemczak@ubuntu.com> | 2018-02-05 17:25:14 +0100 |
---|---|---|
committer | usd-importer <ubuntu-server@lists.ubuntu.com> | 2018-02-05 19:15:55 +0000 |
commit | 6c9cd7e1ac55aae259d8e2f06569375e27a12f20 (patch) | |
tree | 335726f611f1ed30aef7d82ff0e2bae0a91ff44b /tests/common/osutil/test_default.py | |
parent | 110d301b04a64d680fc7d102424e303a8e3ca1a6 (diff) | |
parent | d5298bbf0f5696fc948877304e86f43d477d6b71 (diff) | |
download | vyos-walinuxagent-6c9cd7e1ac55aae259d8e2f06569375e27a12f20.tar.gz vyos-walinuxagent-6c9cd7e1ac55aae259d8e2f06569375e27a12f20.zip |
Import patches-applied version 2.2.21-0ubuntu1 to applied/ubuntu/bionic-proposed
Imported using git-ubuntu import.
Changelog parent: 110d301b04a64d680fc7d102424e303a8e3ca1a6
Unapplied parent: d5298bbf0f5696fc948877304e86f43d477d6b71
New changelog entries:
* New upstream release (LP: #1746628).
* debian/patches/disable_import_test.patch: refreshed patch.
Diffstat (limited to 'tests/common/osutil/test_default.py')
-rw-r--r-- | tests/common/osutil/test_default.py | 127 |
1 files changed, 124 insertions, 3 deletions
diff --git a/tests/common/osutil/test_default.py b/tests/common/osutil/test_default.py index 08125ae..c9fa1de 100644 --- a/tests/common/osutil/test_default.py +++ b/tests/common/osutil/test_default.py @@ -489,6 +489,62 @@ Match host 192.168.1.2\n\ print("WRITING TO {0}".format(waagent_sudoers)) self.assertEqual(1, count) + def test_get_firewall_dropped_packets_returns_zero_if_firewall_disabled(self): + osutil._enable_firewall = False + util = osutil.DefaultOSUtil() + + self.assertEqual(0, util.get_firewall_dropped_packets("not used")) + + @patch('azurelinuxagent.common.utils.shellutil.run_get_output') + def test_get_firewall_dropped_packets_returns_negative_if_error(self, mock_output): + osutil._enable_firewall = True + util = osutil.DefaultOSUtil() + + mock_output.side_effect = [ + (0, "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION)), + (1, "not used")] + self.assertEqual(-1, util.get_firewall_dropped_packets("not used")) + + @patch('azurelinuxagent.common.utils.shellutil.run_get_output') + def test_get_firewall_dropped_packets_returns_negative_if_exception(self, mock_output): + osutil._enable_firewall = True + util = osutil.DefaultOSUtil() + + mock_output.side_effect = [ + (0, "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION)), + (1, Exception)] + self.assertEqual(-1, util.get_firewall_dropped_packets("not used")) + + @patch('azurelinuxagent.common.utils.shellutil.run_get_output') + def test_get_firewall_dropped_packets_transient_error_ignored(self, mock_output): + osutil._enable_firewall = True + util = osutil.DefaultOSUtil() + + mock_output.side_effect = [ + (0, "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION)), + (3, "can't initialize iptables table `security': iptables who? (do you need to insmod?)")] + self.assertEqual(0, util.get_firewall_dropped_packets("not used")) + + @patch('azurelinuxagent.common.utils.shellutil.run_get_output') + def test_get_firewall_dropped_packets(self, mock_output): + osutil._enable_firewall = True + util = osutil.DefaultOSUtil() + + mock_output.side_effect = [ + (0, "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION)), + (0, +''' + +Chain OUTPUT (policy ACCEPT 104 packets, 43628 bytes) + pkts bytes target prot opt in out source destination + 0 0 ACCEPT tcp -- any any anywhere 168.63.129.16 owner UID match daemon + 32 1920 DROP tcp -- any any anywhere 168.63.129.16 + +''')] + dst = '168.63.129.16' + + self.assertEqual(32, util.get_firewall_dropped_packets(dst)) + @patch('os.getuid', return_value=42) @patch('azurelinuxagent.common.utils.shellutil.run_get_output') @patch('azurelinuxagent.common.utils.shellutil.run') @@ -592,6 +648,34 @@ Match host 192.168.1.2\n\ ]) self.assertFalse(osutil._enable_firewall) + @patch('azurelinuxagent.common.utils.shellutil.run_get_output') + @patch('azurelinuxagent.common.utils.shellutil.run') + def test_enable_firewall_checks_for_invalid_iptables_options(self, mock_run, mock_output): + osutil._enable_firewall = True + util = osutil.DefaultOSUtil() + + dst = '1.2.3.4' + version = "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION) + wait = "-w" + + # iptables uses the following exit codes + # 0 - correct function + # 1 - other errors + # 2 - errors which appear to be caused by invalid or abused command + # line parameters + mock_run.side_effect = [2] + mock_output.return_value = (0, version) + + self.assertFalse(util.enable_firewall(dst_ip='1.2.3.4', uid=42)) + self.assertFalse(osutil._enable_firewall) + + mock_run.assert_has_calls([ + call(osutil.FIREWALL_DROP.format(wait, "C", dst), chk_err=False), + ]) + mock_output.assert_has_calls([ + call(osutil.IPTABLES_VERSION) + ]) + @patch('os.getuid', return_value=42) @patch('azurelinuxagent.common.utils.shellutil.run_get_output') @patch('azurelinuxagent.common.utils.shellutil.run') @@ -624,17 +708,54 @@ Match host 192.168.1.2\n\ version = "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION) wait = "-w" - mock_run.side_effect = [0, 0] + mock_run.side_effect = [0, 1, 0, 1] mock_output.side_effect = [(0, version), (0, "Output")] - self.assertTrue(util.remove_firewall()) + self.assertTrue(util.remove_firewall(dst, uid)) mock_run.assert_has_calls([ - call(osutil.FIREWALL_FLUSH.format(wait), chk_err=False) + call(osutil.FIREWALL_DELETE_CONNTRACK.format(wait, dst), chk_err=False), + call(osutil.FIREWALL_DELETE_CONNTRACK.format(wait, dst), chk_err=False), + call(osutil.FIREWALL_DELETE_OWNER.format(wait, dst, uid), chk_err=False), + call(osutil.FIREWALL_DELETE_OWNER.format(wait, dst, uid), chk_err=False), ]) mock_output.assert_has_calls([ call(osutil.IPTABLES_VERSION) ]) self.assertTrue(osutil._enable_firewall) + @patch('os.getuid', return_value=42) + @patch('azurelinuxagent.common.utils.shellutil.run_get_output') + @patch('azurelinuxagent.common.utils.shellutil.run') + def test_remove_firewall_does_not_repeat(self, mock_run, mock_output, _): + osutil._enable_firewall = True + util = osutil.DefaultOSUtil() + + dst_ip='1.2.3.4' + uid=42 + version = "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION) + wait = "-w" + + mock_run.side_effect = [2] + mock_output.side_effect = [(0, version), (1, "Output")] + self.assertFalse(util.remove_firewall(dst_ip, uid)) + + mock_run.assert_has_calls([ + call(osutil.FIREWALL_DELETE_CONNTRACK.format(wait, dst_ip), chk_err=False), + ]) + mock_output.assert_has_calls([ + call(osutil.IPTABLES_VERSION) + ]) + self.assertFalse(osutil._enable_firewall) + + self.assertTrue(mock_run.call_count == 1) + self.assertTrue(mock_output.call_count == 1) + + self.assertFalse(util.remove_firewall()) + self.assertFalse(util.remove_firewall()) + + self.assertTrue(mock_run.call_count == 1) + self.assertTrue(mock_output.call_count == 1) + + if __name__ == '__main__': unittest.main() |