summaryrefslogtreecommitdiff
path: root/tests/common/osutil/test_default.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/common/osutil/test_default.py')
-rw-r--r--tests/common/osutil/test_default.py127
1 files changed, 124 insertions, 3 deletions
diff --git a/tests/common/osutil/test_default.py b/tests/common/osutil/test_default.py
index 08125ae..c9fa1de 100644
--- a/tests/common/osutil/test_default.py
+++ b/tests/common/osutil/test_default.py
@@ -489,6 +489,62 @@ Match host 192.168.1.2\n\
print("WRITING TO {0}".format(waagent_sudoers))
self.assertEqual(1, count)
+ def test_get_firewall_dropped_packets_returns_zero_if_firewall_disabled(self):
+ osutil._enable_firewall = False
+ util = osutil.DefaultOSUtil()
+
+ self.assertEqual(0, util.get_firewall_dropped_packets("not used"))
+
+ @patch('azurelinuxagent.common.utils.shellutil.run_get_output')
+ def test_get_firewall_dropped_packets_returns_negative_if_error(self, mock_output):
+ osutil._enable_firewall = True
+ util = osutil.DefaultOSUtil()
+
+ mock_output.side_effect = [
+ (0, "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION)),
+ (1, "not used")]
+ self.assertEqual(-1, util.get_firewall_dropped_packets("not used"))
+
+ @patch('azurelinuxagent.common.utils.shellutil.run_get_output')
+ def test_get_firewall_dropped_packets_returns_negative_if_exception(self, mock_output):
+ osutil._enable_firewall = True
+ util = osutil.DefaultOSUtil()
+
+ mock_output.side_effect = [
+ (0, "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION)),
+ (1, Exception)]
+ self.assertEqual(-1, util.get_firewall_dropped_packets("not used"))
+
+ @patch('azurelinuxagent.common.utils.shellutil.run_get_output')
+ def test_get_firewall_dropped_packets_transient_error_ignored(self, mock_output):
+ osutil._enable_firewall = True
+ util = osutil.DefaultOSUtil()
+
+ mock_output.side_effect = [
+ (0, "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION)),
+ (3, "can't initialize iptables table `security': iptables who? (do you need to insmod?)")]
+ self.assertEqual(0, util.get_firewall_dropped_packets("not used"))
+
+ @patch('azurelinuxagent.common.utils.shellutil.run_get_output')
+ def test_get_firewall_dropped_packets(self, mock_output):
+ osutil._enable_firewall = True
+ util = osutil.DefaultOSUtil()
+
+ mock_output.side_effect = [
+ (0, "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION)),
+ (0,
+'''
+
+Chain OUTPUT (policy ACCEPT 104 packets, 43628 bytes)
+ pkts bytes target prot opt in out source destination
+ 0 0 ACCEPT tcp -- any any anywhere 168.63.129.16 owner UID match daemon
+ 32 1920 DROP tcp -- any any anywhere 168.63.129.16
+
+''')]
+ dst = '168.63.129.16'
+
+ self.assertEqual(32, util.get_firewall_dropped_packets(dst))
+
@patch('os.getuid', return_value=42)
@patch('azurelinuxagent.common.utils.shellutil.run_get_output')
@patch('azurelinuxagent.common.utils.shellutil.run')
@@ -592,6 +648,34 @@ Match host 192.168.1.2\n\
])
self.assertFalse(osutil._enable_firewall)
+ @patch('azurelinuxagent.common.utils.shellutil.run_get_output')
+ @patch('azurelinuxagent.common.utils.shellutil.run')
+ def test_enable_firewall_checks_for_invalid_iptables_options(self, mock_run, mock_output):
+ osutil._enable_firewall = True
+ util = osutil.DefaultOSUtil()
+
+ dst = '1.2.3.4'
+ version = "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION)
+ wait = "-w"
+
+ # iptables uses the following exit codes
+ # 0 - correct function
+ # 1 - other errors
+ # 2 - errors which appear to be caused by invalid or abused command
+ # line parameters
+ mock_run.side_effect = [2]
+ mock_output.return_value = (0, version)
+
+ self.assertFalse(util.enable_firewall(dst_ip='1.2.3.4', uid=42))
+ self.assertFalse(osutil._enable_firewall)
+
+ mock_run.assert_has_calls([
+ call(osutil.FIREWALL_DROP.format(wait, "C", dst), chk_err=False),
+ ])
+ mock_output.assert_has_calls([
+ call(osutil.IPTABLES_VERSION)
+ ])
+
@patch('os.getuid', return_value=42)
@patch('azurelinuxagent.common.utils.shellutil.run_get_output')
@patch('azurelinuxagent.common.utils.shellutil.run')
@@ -624,17 +708,54 @@ Match host 192.168.1.2\n\
version = "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION)
wait = "-w"
- mock_run.side_effect = [0, 0]
+ mock_run.side_effect = [0, 1, 0, 1]
mock_output.side_effect = [(0, version), (0, "Output")]
- self.assertTrue(util.remove_firewall())
+ self.assertTrue(util.remove_firewall(dst, uid))
mock_run.assert_has_calls([
- call(osutil.FIREWALL_FLUSH.format(wait), chk_err=False)
+ call(osutil.FIREWALL_DELETE_CONNTRACK.format(wait, dst), chk_err=False),
+ call(osutil.FIREWALL_DELETE_CONNTRACK.format(wait, dst), chk_err=False),
+ call(osutil.FIREWALL_DELETE_OWNER.format(wait, dst, uid), chk_err=False),
+ call(osutil.FIREWALL_DELETE_OWNER.format(wait, dst, uid), chk_err=False),
])
mock_output.assert_has_calls([
call(osutil.IPTABLES_VERSION)
])
self.assertTrue(osutil._enable_firewall)
+ @patch('os.getuid', return_value=42)
+ @patch('azurelinuxagent.common.utils.shellutil.run_get_output')
+ @patch('azurelinuxagent.common.utils.shellutil.run')
+ def test_remove_firewall_does_not_repeat(self, mock_run, mock_output, _):
+ osutil._enable_firewall = True
+ util = osutil.DefaultOSUtil()
+
+ dst_ip='1.2.3.4'
+ uid=42
+ version = "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION)
+ wait = "-w"
+
+ mock_run.side_effect = [2]
+ mock_output.side_effect = [(0, version), (1, "Output")]
+ self.assertFalse(util.remove_firewall(dst_ip, uid))
+
+ mock_run.assert_has_calls([
+ call(osutil.FIREWALL_DELETE_CONNTRACK.format(wait, dst_ip), chk_err=False),
+ ])
+ mock_output.assert_has_calls([
+ call(osutil.IPTABLES_VERSION)
+ ])
+ self.assertFalse(osutil._enable_firewall)
+
+ self.assertTrue(mock_run.call_count == 1)
+ self.assertTrue(mock_output.call_count == 1)
+
+ self.assertFalse(util.remove_firewall())
+ self.assertFalse(util.remove_firewall())
+
+ self.assertTrue(mock_run.call_count == 1)
+ self.assertTrue(mock_output.call_count == 1)
+
+
if __name__ == '__main__':
unittest.main()