summaryrefslogtreecommitdiff
path: root/tests/common
diff options
context:
space:
mode:
Diffstat (limited to 'tests/common')
-rw-r--r--tests/common/osutil/test_default.py55
-rw-r--r--tests/common/test_conf.py6
-rw-r--r--tests/common/test_errorstate.py69
-rw-r--r--tests/common/test_event.py7
-rw-r--r--tests/common/test_logger.py40
5 files changed, 13 insertions, 164 deletions
diff --git a/tests/common/osutil/test_default.py b/tests/common/osutil/test_default.py
index c9fa1de..a73f4b4 100644
--- a/tests/common/osutil/test_default.py
+++ b/tests/common/osutil/test_default.py
@@ -516,16 +516,6 @@ Match host 192.168.1.2\n\
self.assertEqual(-1, util.get_firewall_dropped_packets("not used"))
@patch('azurelinuxagent.common.utils.shellutil.run_get_output')
- def test_get_firewall_dropped_packets_transient_error_ignored(self, mock_output):
- osutil._enable_firewall = True
- util = osutil.DefaultOSUtil()
-
- mock_output.side_effect = [
- (0, "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION)),
- (3, "can't initialize iptables table `security': iptables who? (do you need to insmod?)")]
- self.assertEqual(0, util.get_firewall_dropped_packets("not used"))
-
- @patch('azurelinuxagent.common.utils.shellutil.run_get_output')
def test_get_firewall_dropped_packets(self, mock_output):
osutil._enable_firewall = True
util = osutil.DefaultOSUtil()
@@ -648,34 +638,6 @@ Chain OUTPUT (policy ACCEPT 104 packets, 43628 bytes)
])
self.assertFalse(osutil._enable_firewall)
- @patch('azurelinuxagent.common.utils.shellutil.run_get_output')
- @patch('azurelinuxagent.common.utils.shellutil.run')
- def test_enable_firewall_checks_for_invalid_iptables_options(self, mock_run, mock_output):
- osutil._enable_firewall = True
- util = osutil.DefaultOSUtil()
-
- dst = '1.2.3.4'
- version = "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION)
- wait = "-w"
-
- # iptables uses the following exit codes
- # 0 - correct function
- # 1 - other errors
- # 2 - errors which appear to be caused by invalid or abused command
- # line parameters
- mock_run.side_effect = [2]
- mock_output.return_value = (0, version)
-
- self.assertFalse(util.enable_firewall(dst_ip='1.2.3.4', uid=42))
- self.assertFalse(osutil._enable_firewall)
-
- mock_run.assert_has_calls([
- call(osutil.FIREWALL_DROP.format(wait, "C", dst), chk_err=False),
- ])
- mock_output.assert_has_calls([
- call(osutil.IPTABLES_VERSION)
- ])
-
@patch('os.getuid', return_value=42)
@patch('azurelinuxagent.common.utils.shellutil.run_get_output')
@patch('azurelinuxagent.common.utils.shellutil.run')
@@ -708,15 +670,12 @@ Chain OUTPUT (policy ACCEPT 104 packets, 43628 bytes)
version = "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION)
wait = "-w"
- mock_run.side_effect = [0, 1, 0, 1]
+ mock_run.side_effect = [0, 0]
mock_output.side_effect = [(0, version), (0, "Output")]
- self.assertTrue(util.remove_firewall(dst, uid))
+ self.assertTrue(util.remove_firewall())
mock_run.assert_has_calls([
- call(osutil.FIREWALL_DELETE_CONNTRACK.format(wait, dst), chk_err=False),
- call(osutil.FIREWALL_DELETE_CONNTRACK.format(wait, dst), chk_err=False),
- call(osutil.FIREWALL_DELETE_OWNER.format(wait, dst, uid), chk_err=False),
- call(osutil.FIREWALL_DELETE_OWNER.format(wait, dst, uid), chk_err=False),
+ call(osutil.FIREWALL_FLUSH.format(wait), chk_err=True)
])
mock_output.assert_has_calls([
call(osutil.IPTABLES_VERSION)
@@ -730,17 +689,15 @@ Chain OUTPUT (policy ACCEPT 104 packets, 43628 bytes)
osutil._enable_firewall = True
util = osutil.DefaultOSUtil()
- dst_ip='1.2.3.4'
- uid=42
version = "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION)
wait = "-w"
- mock_run.side_effect = [2]
+ mock_run.side_effect = [1, 0]
mock_output.side_effect = [(0, version), (1, "Output")]
- self.assertFalse(util.remove_firewall(dst_ip, uid))
+ self.assertFalse(util.remove_firewall())
mock_run.assert_has_calls([
- call(osutil.FIREWALL_DELETE_CONNTRACK.format(wait, dst_ip), chk_err=False),
+ call(osutil.FIREWALL_FLUSH.format(wait), chk_err=True)
])
mock_output.assert_has_calls([
call(osutil.IPTABLES_VERSION)
diff --git a/tests/common/test_conf.py b/tests/common/test_conf.py
index 057c83b..a2c6573 100644
--- a/tests/common/test_conf.py
+++ b/tests/common/test_conf.py
@@ -65,7 +65,7 @@ class TestConf(AgentTestCase):
"AutoUpdate.GAFamily" : "Prod",
"EnableOverProvisioning" : False,
"OS.AllowHTTP" : False,
- "OS.EnableFirewall" : False
+ "OS.EnableFirewall" : True
}
def setUp(self):
@@ -78,7 +78,6 @@ class TestConf(AgentTestCase):
def test_key_value_handling(self):
self.assertEqual("Value1", self.conf.get("FauxKey1", "Bad"))
self.assertEqual("Value2 Value2", self.conf.get("FauxKey2", "Bad"))
- self.assertEqual("delalloc,rw,noatime,nobarrier,users,mode=777", self.conf.get("FauxKey3", "Bad"))
def test_get_ssh_dir(self):
self.assertTrue(get_ssh_dir(self.conf).startswith("/notareal/path"))
@@ -111,5 +110,4 @@ class TestConf(AgentTestCase):
for k in TestConf.EXPECTED_CONFIGURATION.keys():
self.assertEqual(
TestConf.EXPECTED_CONFIGURATION[k],
- configuration[k],
- k)
+ configuration[k])
diff --git a/tests/common/test_errorstate.py b/tests/common/test_errorstate.py
deleted file mode 100644
index 7513fe5..0000000
--- a/tests/common/test_errorstate.py
+++ /dev/null
@@ -1,69 +0,0 @@
-from datetime import timedelta
-
-from azurelinuxagent.common.errorstate import *
-from tests.tools import *
-
-
-class TestErrorState(unittest.TestCase):
- def test_errorstate00(self):
- """
- If ErrorState is never incremented, it will never trigger.
- """
- test_subject = ErrorState(timedelta(seconds=10000))
- self.assertFalse(test_subject.is_triggered())
- self.assertEqual(0, test_subject.count)
-
- def test_errorstate01(self):
- """
- If ErrorState is never incremented, and the timedelta is zero it will
- not trigger.
- """
- test_subject = ErrorState(timedelta(seconds=0))
- self.assertFalse(test_subject.is_triggered())
- self.assertEqual(0, test_subject.count)
-
- def test_errorstate02(self):
- """
- If ErrorState is triggered, and the current time is within timedelta
- of now it will trigger.
- """
- test_subject = ErrorState(timedelta(seconds=0))
- test_subject.incr()
-
-
- self.assertTrue(test_subject.is_triggered())
- self.assertEqual(1, test_subject.count)
-
- @patch('azurelinuxagent.common.errorstate.datetime')
- def test_errorstate03(self, mock_time):
- """
- ErrorState will not trigger until
- 1. ErrorState has been incr() at least once.
- 2. The timedelta from the first incr() has elapsed.
- """
- test_subject = ErrorState(timedelta(minutes=15))
-
- for x in range(1, 10):
- mock_time.utcnow = Mock(return_value=datetime.utcnow() + timedelta(minutes=x))
-
- test_subject.incr()
- self.assertFalse(test_subject.is_triggered())
-
- mock_time.utcnow = Mock(return_value=datetime.utcnow() + timedelta(minutes=30))
- test_subject.incr()
- self.assertTrue(test_subject.is_triggered())
-
- def test_errorstate04(self):
- """
- If ErrorState is reset the timestamp of the last incr() is reset to
- None.
- """
-
- test_subject = ErrorState(timedelta(minutes=15))
- self.assertTrue(test_subject.timestamp is None)
-
- test_subject.incr()
- self.assertTrue(test_subject.timestamp is not None)
-
- test_subject.reset()
- self.assertTrue(test_subject.timestamp is None)
diff --git a/tests/common/test_event.py b/tests/common/test_event.py
index 01bcd7b..d52d3e6 100644
--- a/tests/common/test_event.py
+++ b/tests/common/test_event.py
@@ -49,7 +49,8 @@ class TestEvent(AgentTestCase):
self.assertTrue(es.event_succeeded("Foo", "1.2", "FauxOperation"))
def test_event_status_records_status(self):
- es = event.EventStatus()
+ d = tempfile.mkdtemp()
+ es = event.EventStatus(tempfile.mkdtemp())
es.mark_event_status("Foo", "1.2", "FauxOperation", True)
self.assertTrue(es.event_succeeded("Foo", "1.2", "FauxOperation"))
@@ -69,7 +70,7 @@ class TestEvent(AgentTestCase):
self.assertFalse(es.event_succeeded("Foo", "1.2", "FauxOperation"))
def test_should_emit_event_ignores_unknown_operations(self):
- event.__event_status__ = event.EventStatus()
+ event.__event_status__ = event.EventStatus(tempfile.mkdtemp())
self.assertTrue(event.should_emit_event("Foo", "1.2", "FauxOperation", True))
self.assertTrue(event.should_emit_event("Foo", "1.2", "FauxOperation", False))
@@ -82,7 +83,7 @@ class TestEvent(AgentTestCase):
def test_should_emit_event_handles_known_operations(self):
- event.__event_status__ = event.EventStatus()
+ event.__event_status__ = event.EventStatus(tempfile.mkdtemp())
# Known operations always initially "fire"
for op in event.__event_status_operations__:
diff --git a/tests/common/test_logger.py b/tests/common/test_logger.py
index 005c429..9e298b3 100644
--- a/tests/common/test_logger.py
+++ b/tests/common/test_logger.py
@@ -15,20 +15,17 @@
# Requires Python 2.4+ and Openssl 1.0+
#
-import json
from datetime import datetime
import azurelinuxagent.common.logger as logger
-from azurelinuxagent.common.event import add_log_event
-from azurelinuxagent.common.version import CURRENT_AGENT, CURRENT_VERSION
from tests.tools import *
_MSG = "This is our test logging message {0} {1}"
_DATA = ["arg1", "arg2"]
-
class TestLogger(AgentTestCase):
+
@patch('azurelinuxagent.common.logger.Logger.info')
def test_periodic_emits_if_not_previously_sent(self, mock_info):
logger.reset_periodic()
@@ -67,38 +64,3 @@ class TestLogger(AgentTestCase):
logger.periodic(logger.EVERY_DAY, _MSG, *_DATA)
mock_info.assert_called_once_with(_MSG, *_DATA)
-
- def test_telemetry_logger(self):
- mock = MagicMock()
- appender = logger.TelemetryAppender(logger.LogLevel.WARNING, mock)
- appender.write(logger.LogLevel.WARNING, "--unit-test--")
-
- mock.assert_called_once_with(logger.LogLevel.WARNING, "--unit-test--")
-
- @patch('azurelinuxagent.common.event.EventLogger.save_event')
- def test_telemetry_logger1(self, mock_save):
- appender = logger.TelemetryAppender(logger.LogLevel.WARNING, add_log_event)
- appender.write(logger.LogLevel.WARNING, "--unit-test--")
-
- self.assertEqual(1, mock_save.call_count)
- telemetry_json = json.loads(mock_save.call_args[0][0])
-
- self.assertEqual('FFF0196F-EE4C-4EAF-9AA5-776F622DEB4F', telemetry_json['providerId'])
- self.assertEqual(7, telemetry_json['eventId'])
-
- self.assertEqual(5, len(telemetry_json['parameters']))
- for x in telemetry_json['parameters']:
- if x['name'] == 'EventName':
- self.assertEqual(x['value'], 'Log')
-
- elif x['name'] == 'CapabilityUsed':
- self.assertEqual(x['value'], 'WARNING')
-
- elif x['name'] == 'Context1':
- self.assertEqual(x['value'], '--unit-test--')
-
- elif x['name'] == 'Context2':
- self.assertEqual(x['value'], '')
-
- elif x['name'] == 'Context3':
- self.assertEqual(x['value'], '')