summaryrefslogtreecommitdiff
path: root/tests/common
diff options
context:
space:
mode:
Diffstat (limited to 'tests/common')
-rw-r--r--tests/common/osutil/test_default.py229
-rw-r--r--tests/common/test_conf.py51
-rw-r--r--tests/common/test_event.py117
3 files changed, 364 insertions, 33 deletions
diff --git a/tests/common/osutil/test_default.py b/tests/common/osutil/test_default.py
index 87acc60..ec4408b 100644
--- a/tests/common/osutil/test_default.py
+++ b/tests/common/osutil/test_default.py
@@ -25,6 +25,7 @@ from azurelinuxagent.common.exception import OSUtilError
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common.osutil import get_osutil
from azurelinuxagent.common.utils import fileutil
+from azurelinuxagent.common.utils.flexible_version import FlexibleVersion
from tests.tools import *
@@ -112,6 +113,21 @@ class TestOSUtil(AgentTestCase):
self.assertFalse(osutil.DefaultOSUtil().is_primary_interface('lo'))
self.assertTrue(osutil.DefaultOSUtil().is_primary_interface('eth0'))
+ def test_sriov(self):
+ routing_table = "\
+ Iface Destination Gateway Flags RefCnt Use Metric Mask MTU Window IRTT \n" \
+ "bond0 00000000 0100000A 0003 0 0 0 00000000 0 0 0 \n" \
+ "bond0 0000000A 00000000 0001 0 0 0 00000000 0 0 0 \n" \
+ "eth0 0000000A 00000000 0001 0 0 0 00000000 0 0 0 \n" \
+ "bond0 10813FA8 0100000A 0007 0 0 0 00000000 0 0 0 \n" \
+ "bond0 FEA9FEA9 0100000A 0007 0 0 0 00000000 0 0 0 \n"
+
+ mo = mock.mock_open(read_data=routing_table)
+ with patch(open_patch(), mo):
+ self.assertFalse(osutil.DefaultOSUtil().is_primary_interface('eth0'))
+ self.assertTrue(osutil.DefaultOSUtil().is_primary_interface('bond0'))
+
+
def test_multiple_default_routes(self):
routing_table = "\
Iface Destination Gateway Flags RefCnt Use Metric Mask MTU Window IRTT \n\
@@ -362,23 +378,50 @@ Match host 192.168.1.2\n\
conf.get_sshd_conf_file_path(),
expected_output)
+ def test_correct_instance_id(self):
+ util = osutil.DefaultOSUtil()
+ self.assertEqual(
+ "12345678-1234-1234-1234-123456789012",
+ util._correct_instance_id("78563412-3412-3412-1234-123456789012"))
+ self.assertEqual(
+ "D0DF4C54-4ECB-4A4B-9954-5BDF3ED5C3B8",
+ util._correct_instance_id("544CDFD0-CB4E-4B4A-9954-5BDF3ED5C3B8"))
+
@patch('os.path.isfile', return_value=True)
@patch('azurelinuxagent.common.utils.fileutil.read_file',
- return_value="B9F3C233-9913-9F42-8EB3-BA656DF32502")
+ return_value="33C2F3B9-1399-429F-8EB3-BA656DF32502")
def test_get_instance_id_from_file(self, mock_read, mock_isfile):
util = osutil.DefaultOSUtil()
self.assertEqual(
- "B9F3C233-9913-9F42-8EB3-BA656DF32502",
+ util.get_instance_id(),
+ "B9F3C233-9913-9F42-8EB3-BA656DF32502")
+
+ @patch('os.path.isfile', return_value=True)
+ @patch('azurelinuxagent.common.utils.fileutil.read_file',
+ return_value="")
+ def test_get_instance_id_empty_from_file(self, mock_read, mock_isfile):
+ util = osutil.DefaultOSUtil()
+ self.assertEqual(
+ "",
+ util.get_instance_id())
+
+ @patch('os.path.isfile', return_value=True)
+ @patch('azurelinuxagent.common.utils.fileutil.read_file',
+ return_value="Value")
+ def test_get_instance_id_malformed_from_file(self, mock_read, mock_isfile):
+ util = osutil.DefaultOSUtil()
+ self.assertEqual(
+ "Value",
util.get_instance_id())
@patch('os.path.isfile', return_value=False)
@patch('azurelinuxagent.common.utils.shellutil.run_get_output',
- return_value=[0, 'B9F3C233-9913-9F42-8EB3-BA656DF32502'])
+ return_value=[0, '33C2F3B9-1399-429F-8EB3-BA656DF32502'])
def test_get_instance_id_from_dmidecode(self, mock_shell, mock_isfile):
util = osutil.DefaultOSUtil()
self.assertEqual(
- "B9F3C233-9913-9F42-8EB3-BA656DF32502",
- util.get_instance_id())
+ util.get_instance_id(),
+ "B9F3C233-9913-9F42-8EB3-BA656DF32502")
@patch('os.path.isfile', return_value=False)
@patch('azurelinuxagent.common.utils.shellutil.run_get_output',
@@ -394,5 +437,181 @@ Match host 192.168.1.2\n\
util = osutil.DefaultOSUtil()
self.assertEqual("", util.get_instance_id())
+ @patch('os.path.isfile', return_value=True)
+ @patch('azurelinuxagent.common.utils.fileutil.read_file')
+ def test_is_current_instance_id_from_file(self, mock_read, mock_isfile):
+ util = osutil.DefaultOSUtil()
+
+ mock_read.return_value = "B9F3C233-9913-9F42-8EB3-BA656DF32502"
+ self.assertTrue(util.is_current_instance_id(
+ "B9F3C233-9913-9F42-8EB3-BA656DF32502"))
+
+ mock_read.return_value = "33C2F3B9-1399-429F-8EB3-BA656DF32502"
+ self.assertTrue(util.is_current_instance_id(
+ "B9F3C233-9913-9F42-8EB3-BA656DF32502"))
+
+ @patch('os.path.isfile', return_value=False)
+ @patch('azurelinuxagent.common.utils.shellutil.run_get_output')
+ def test_is_current_instance_id_from_dmidecode(self, mock_shell, mock_isfile):
+ util = osutil.DefaultOSUtil()
+
+ mock_shell.return_value = [0, 'B9F3C233-9913-9F42-8EB3-BA656DF32502']
+ self.assertTrue(util.is_current_instance_id(
+ "B9F3C233-9913-9F42-8EB3-BA656DF32502"))
+
+ mock_shell.return_value = [0, '33C2F3B9-1399-429F-8EB3-BA656DF32502']
+ self.assertTrue(util.is_current_instance_id(
+ "B9F3C233-9913-9F42-8EB3-BA656DF32502"))
+
+ @patch('azurelinuxagent.common.conf.get_sudoers_dir')
+ def test_conf_sudoer(self, mock_dir):
+ tmp_dir = tempfile.mkdtemp()
+ mock_dir.return_value = tmp_dir
+
+ util = osutil.DefaultOSUtil()
+
+ # Assert the sudoer line is added if missing
+ util.conf_sudoer("FooBar")
+ waagent_sudoers = os.path.join(tmp_dir, 'waagent')
+ self.assertTrue(os.path.isfile(waagent_sudoers))
+
+ count = -1
+ with open(waagent_sudoers, 'r') as f:
+ count = len(f.readlines())
+ self.assertEqual(1, count)
+
+ # Assert the line does not get added a second time
+ util.conf_sudoer("FooBar")
+
+ count = -1
+ with open(waagent_sudoers, 'r') as f:
+ count = len(f.readlines())
+ print("WRITING TO {0}".format(waagent_sudoers))
+ self.assertEqual(1, count)
+
+ @patch('os.getuid', return_value=42)
+ @patch('azurelinuxagent.common.utils.shellutil.run_get_output')
+ @patch('azurelinuxagent.common.utils.shellutil.run')
+ def test_enable_firewall(self, mock_run, mock_output, mock_uid):
+ osutil._enable_firewall = True
+ util = osutil.DefaultOSUtil()
+
+ dst = '1.2.3.4'
+ uid = 42
+ version = "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION)
+ wait = "-w"
+
+ mock_run.side_effect = [1, 0, 0]
+ mock_output.side_effect = [(0, version), (0, "Output")]
+ self.assertTrue(util.enable_firewall(dst_ip=dst, uid=uid))
+
+ mock_run.assert_has_calls([
+ call(osutil.FIREWALL_DROP.format(wait, "C", dst), chk_err=False),
+ call(osutil.FIREWALL_ACCEPT.format(wait, "A", dst, uid)),
+ call(osutil.FIREWALL_DROP.format(wait, "A", dst))
+ ])
+ mock_output.assert_has_calls([
+ call(osutil.IPTABLES_VERSION),
+ call(osutil.FIREWALL_LIST.format(wait))
+ ])
+ self.assertTrue(osutil._enable_firewall)
+
+ @patch('os.getuid', return_value=42)
+ @patch('azurelinuxagent.common.utils.shellutil.run_get_output')
+ @patch('azurelinuxagent.common.utils.shellutil.run')
+ def test_enable_firewall_no_wait(self, mock_run, mock_output, mock_uid):
+ osutil._enable_firewall = True
+ util = osutil.DefaultOSUtil()
+
+ dst = '1.2.3.4'
+ uid = 42
+ version = "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION-1)
+ wait = ""
+
+ mock_run.side_effect = [1, 0, 0]
+ mock_output.side_effect = [(0, version), (0, "Output")]
+ self.assertTrue(util.enable_firewall(dst_ip=dst, uid=uid))
+
+ mock_run.assert_has_calls([
+ call(osutil.FIREWALL_DROP.format(wait, "C", dst), chk_err=False),
+ call(osutil.FIREWALL_ACCEPT.format(wait, "A", dst, uid)),
+ call(osutil.FIREWALL_DROP.format(wait, "A", dst))
+ ])
+ mock_output.assert_has_calls([
+ call(osutil.IPTABLES_VERSION),
+ call(osutil.FIREWALL_LIST.format(wait))
+ ])
+ self.assertTrue(osutil._enable_firewall)
+
+ @patch('os.getuid', return_value=42)
+ @patch('azurelinuxagent.common.utils.shellutil.run_get_output')
+ @patch('azurelinuxagent.common.utils.shellutil.run')
+ def test_enable_firewall_skips_if_drop_exists(self, mock_run, mock_output, mock_uid):
+ osutil._enable_firewall = True
+ util = osutil.DefaultOSUtil()
+
+ dst = '1.2.3.4'
+ uid = 42
+ version = "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION)
+ wait = "-w"
+
+ mock_run.side_effect = [0, 0, 0]
+ mock_output.return_value = (0, version)
+ self.assertTrue(util.enable_firewall(dst_ip=dst, uid=uid))
+
+ mock_run.assert_has_calls([
+ call(osutil.FIREWALL_DROP.format(wait, "C", dst), chk_err=False),
+ ])
+ mock_output.assert_has_calls([
+ call(osutil.IPTABLES_VERSION)
+ ])
+ self.assertTrue(osutil._enable_firewall)
+
+ @patch('os.getuid', return_value=42)
+ @patch('azurelinuxagent.common.utils.shellutil.run_get_output')
+ @patch('azurelinuxagent.common.utils.shellutil.run')
+ def test_enable_firewall_ignores_exceptions(self, mock_run, mock_output, mock_uid):
+ osutil._enable_firewall = True
+ util = osutil.DefaultOSUtil()
+
+ dst = '1.2.3.4'
+ uid = 42
+ version = "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION)
+ wait = "-w"
+
+ mock_run.side_effect = [1, Exception]
+ mock_output.return_value = (0, version)
+ self.assertFalse(util.enable_firewall(dst_ip=dst, uid=uid))
+
+ mock_run.assert_has_calls([
+ call(osutil.FIREWALL_DROP.format(wait, "C", dst), chk_err=False),
+ call(osutil.FIREWALL_ACCEPT.format(wait, "A", dst, uid))
+ ])
+ mock_output.assert_has_calls([
+ call(osutil.IPTABLES_VERSION)
+ ])
+ self.assertFalse(osutil._enable_firewall)
+
+ @patch('os.getuid', return_value=42)
+ @patch('azurelinuxagent.common.utils.shellutil.run_get_output')
+ @patch('azurelinuxagent.common.utils.shellutil.run')
+ def test_enable_firewall_skips_if_disabled(self, mock_run, mock_output, mock_uid):
+ osutil._enable_firewall = False
+ util = osutil.DefaultOSUtil()
+
+ dst = '1.2.3.4'
+ uid = 42
+ version = "iptables v{0}".format(osutil.IPTABLES_LOCKING_VERSION)
+ wait = "-w"
+
+ mock_run.side_effect = [1, 0, 0]
+ mock_output.side_effect = [(0, version), (0, "Output")]
+ self.assertFalse(util.enable_firewall(dst_ip=dst, uid=uid))
+
+ mock_run.assert_not_called()
+ mock_output.assert_not_called()
+ mock_uid.assert_not_called()
+ self.assertFalse(osutil._enable_firewall)
+
if __name__ == '__main__':
unittest.main()
diff --git a/tests/common/test_conf.py b/tests/common/test_conf.py
index 1287b0d..93759de 100644
--- a/tests/common/test_conf.py
+++ b/tests/common/test_conf.py
@@ -24,6 +24,49 @@ from tests.tools import *
class TestConf(AgentTestCase):
+ # Note:
+ # -- These values *MUST* match those from data/test_waagent.conf
+ EXPECTED_CONFIGURATION = {
+ "Provisioning.Enabled" : True,
+ "Provisioning.UseCloudInit" : True,
+ "Provisioning.DeleteRootPassword" : True,
+ "Provisioning.RegenerateSshHostKeyPair" : True,
+ "Provisioning.SshHostKeyPairType" : "rsa",
+ "Provisioning.MonitorHostName" : True,
+ "Provisioning.DecodeCustomData" : False,
+ "Provisioning.ExecuteCustomData" : False,
+ "Provisioning.PasswordCryptId" : '6',
+ "Provisioning.PasswordCryptSaltLength" : 10,
+ "Provisioning.AllowResetSysUser" : False,
+ "ResourceDisk.Format" : True,
+ "ResourceDisk.Filesystem" : "ext4",
+ "ResourceDisk.MountPoint" : "/mnt/resource",
+ "ResourceDisk.EnableSwap" : False,
+ "ResourceDisk.SwapSizeMB" : 0,
+ "ResourceDisk.MountOptions" : None,
+ "Logs.Verbose" : False,
+ "OS.EnableFIPS" : True,
+ "OS.RootDeviceScsiTimeout" : '300',
+ "OS.OpensslPath" : '/usr/bin/openssl',
+ "OS.SshDir" : "/notareal/path",
+ "HttpProxy.Host" : None,
+ "HttpProxy.Port" : None,
+ "DetectScvmmEnv" : False,
+ "Lib.Dir" : "/var/lib/waagent",
+ "DVD.MountPoint" : "/mnt/cdrom/secure",
+ "Pid.File" : "/var/run/waagent.pid",
+ "Extension.LogDir" : "/var/log/azure",
+ "OS.HomeDir" : "/home",
+ "OS.EnableRDMA" : False,
+ "OS.UpdateRdmaDriver" : False,
+ "OS.CheckRdmaDriver" : False,
+ "AutoUpdate.Enabled" : True,
+ "AutoUpdate.GAFamily" : "Prod",
+ "EnableOverProvisioning" : False,
+ "OS.AllowHTTP" : False,
+ "OS.EnableFirewall" : True
+ }
+
def setUp(self):
AgentTestCase.setUp(self)
self.conf = ConfigurationProvider()
@@ -59,3 +102,11 @@ class TestConf(AgentTestCase):
def test_get_provision_cloudinit(self):
self.assertTrue(get_provision_cloudinit(self.conf))
+
+ def test_get_configuration(self):
+ configuration = conf.get_configuration(self.conf)
+ self.assertTrue(len(configuration.keys()) > 0)
+ for k in TestConf.EXPECTED_CONFIGURATION.keys():
+ self.assertEqual(
+ TestConf.EXPECTED_CONFIGURATION[k],
+ configuration[k])
diff --git a/tests/common/test_event.py b/tests/common/test_event.py
index a485edf..55a99c4 100644
--- a/tests/common/test_event.py
+++ b/tests/common/test_event.py
@@ -22,7 +22,8 @@ from datetime import datetime
import azurelinuxagent.common.event as event
import azurelinuxagent.common.logger as logger
-from azurelinuxagent.common.event import init_event_logger, add_event
+from azurelinuxagent.common.event import add_event, \
+ mark_event_status, should_emit_event
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common.version import CURRENT_VERSION
@@ -30,10 +31,84 @@ from tests.tools import *
class TestEvent(AgentTestCase):
+ def test_event_status_event_marked(self):
+ es = event.__event_status__
+
+ self.assertFalse(es.event_marked("Foo", "1.2", "FauxOperation"))
+ es.mark_event_status("Foo", "1.2", "FauxOperation", True)
+ self.assertTrue(es.event_marked("Foo", "1.2", "FauxOperation"))
+
+ event.__event_status__ = event.EventStatus()
+ event.init_event_status(self.tmp_dir)
+ es = event.__event_status__
+ self.assertTrue(es.event_marked("Foo", "1.2", "FauxOperation"))
+
+ def test_event_status_defaults_to_success(self):
+ es = event.__event_status__
+ self.assertTrue(es.event_succeeded("Foo", "1.2", "FauxOperation"))
+
+ def test_event_status_records_status(self):
+ d = tempfile.mkdtemp()
+ es = event.EventStatus(tempfile.mkdtemp())
+
+ es.mark_event_status("Foo", "1.2", "FauxOperation", True)
+ self.assertTrue(es.event_succeeded("Foo", "1.2", "FauxOperation"))
+
+ es.mark_event_status("Foo", "1.2", "FauxOperation", False)
+ self.assertFalse(es.event_succeeded("Foo", "1.2", "FauxOperation"))
+
+ def test_event_status_preserves_state(self):
+ es = event.__event_status__
+
+ es.mark_event_status("Foo", "1.2", "FauxOperation", False)
+ self.assertFalse(es.event_succeeded("Foo", "1.2", "FauxOperation"))
+
+ event.__event_status__ = event.EventStatus()
+ event.init_event_status(self.tmp_dir)
+ es = event.__event_status__
+ self.assertFalse(es.event_succeeded("Foo", "1.2", "FauxOperation"))
+
+ def test_should_emit_event_ignores_unknown_operations(self):
+ event.__event_status__ = event.EventStatus(tempfile.mkdtemp())
+
+ self.assertTrue(event.should_emit_event("Foo", "1.2", "FauxOperation", True))
+ self.assertTrue(event.should_emit_event("Foo", "1.2", "FauxOperation", False))
+
+ # Marking the event has no effect
+ event.mark_event_status("Foo", "1.2", "FauxOperation", True)
+
+ self.assertTrue(event.should_emit_event("Foo", "1.2", "FauxOperation", True))
+ self.assertTrue(event.should_emit_event("Foo", "1.2", "FauxOperation", False))
+
+
+ def test_should_emit_event_handles_known_operations(self):
+ event.__event_status__ = event.EventStatus(tempfile.mkdtemp())
+
+ # Known operations always initially "fire"
+ for op in event.__event_status_operations__:
+ self.assertTrue(event.should_emit_event("Foo", "1.2", op, True))
+ self.assertTrue(event.should_emit_event("Foo", "1.2", op, False))
+
+ # Note a success event...
+ for op in event.__event_status_operations__:
+ event.mark_event_status("Foo", "1.2", op, True)
+
+ # Subsequent success events should not fire, but failures will
+ for op in event.__event_status_operations__:
+ self.assertFalse(event.should_emit_event("Foo", "1.2", op, True))
+ self.assertTrue(event.should_emit_event("Foo", "1.2", op, False))
+
+ # Note a failure event...
+ for op in event.__event_status_operations__:
+ event.mark_event_status("Foo", "1.2", op, False)
+
+ # Subsequent success events fire and failure do not
+ for op in event.__event_status_operations__:
+ self.assertTrue(event.should_emit_event("Foo", "1.2", op, True))
+ self.assertFalse(event.should_emit_event("Foo", "1.2", op, False))
@patch('azurelinuxagent.common.event.EventLogger.add_event')
def test_periodic_emits_if_not_previously_sent(self, mock_event):
- init_event_logger(tempfile.mkdtemp())
event.__event_logger__.reset_periodic()
event.add_periodic(logger.EVERY_DAY, "FauxEvent")
@@ -41,7 +116,6 @@ class TestEvent(AgentTestCase):
@patch('azurelinuxagent.common.event.EventLogger.add_event')
def test_periodic_does_not_emit_if_previously_sent(self, mock_event):
- init_event_logger(tempfile.mkdtemp())
event.__event_logger__.reset_periodic()
event.add_periodic(logger.EVERY_DAY, "FauxEvent")
@@ -52,7 +126,6 @@ class TestEvent(AgentTestCase):
@patch('azurelinuxagent.common.event.EventLogger.add_event')
def test_periodic_emits_if_forced(self, mock_event):
- init_event_logger(tempfile.mkdtemp())
event.__event_logger__.reset_periodic()
event.add_periodic(logger.EVERY_DAY, "FauxEvent")
@@ -63,7 +136,6 @@ class TestEvent(AgentTestCase):
@patch('azurelinuxagent.common.event.EventLogger.add_event')
def test_periodic_emits_after_elapsed_delta(self, mock_event):
- init_event_logger(tempfile.mkdtemp())
event.__event_logger__.reset_periodic()
event.add_periodic(logger.EVERY_DAY, "FauxEvent")
@@ -73,14 +145,13 @@ class TestEvent(AgentTestCase):
self.assertEqual(1, mock_event.call_count)
h = hash("FauxEvent"+""+ustr(True)+"")
- event.__event_logger__.periodic_messages[h] = \
+ event.__event_logger__.periodic_events[h] = \
datetime.now() - logger.EVERY_DAY - logger.EVERY_HOUR
event.add_periodic(logger.EVERY_DAY, "FauxEvent")
self.assertEqual(2, mock_event.call_count)
@patch('azurelinuxagent.common.event.EventLogger.add_event')
def test_periodic_forwards_args(self, mock_event):
- init_event_logger(tempfile.mkdtemp())
event.__event_logger__.reset_periodic()
event.add_periodic(logger.EVERY_DAY, "FauxEvent")
@@ -90,68 +161,58 @@ class TestEvent(AgentTestCase):
log_event=True, message='', op='', version=str(CURRENT_VERSION))
def test_save_event(self):
- tmp_evt = tempfile.mkdtemp()
- init_event_logger(tmp_evt)
add_event('test', message='test event')
- self.assertTrue(len(os.listdir(tmp_evt)) == 1)
- shutil.rmtree(tmp_evt)
+ self.assertTrue(len(os.listdir(self.tmp_dir)) == 1)
def test_save_event_rollover(self):
- tmp_evt = tempfile.mkdtemp()
- init_event_logger(tmp_evt)
add_event('test', message='first event')
for i in range(0, 999):
add_event('test', message='test event {0}'.format(i))
- events = os.listdir(tmp_evt)
+ events = os.listdir(self.tmp_dir)
events.sort()
self.assertTrue(len(events) == 1000)
- first_event = os.path.join(tmp_evt, events[0])
+ first_event = os.path.join(self.tmp_dir, events[0])
with open(first_event) as first_fh:
first_event_text = first_fh.read()
self.assertTrue('first event' in first_event_text)
add_event('test', message='last event')
- events = os.listdir(tmp_evt)
+ events = os.listdir(self.tmp_dir)
events.sort()
self.assertTrue(len(events) == 1000, "{0} events found, 1000 expected".format(len(events)))
- first_event = os.path.join(tmp_evt, events[0])
+ first_event = os.path.join(self.tmp_dir, events[0])
with open(first_event) as first_fh:
first_event_text = first_fh.read()
self.assertFalse('first event' in first_event_text)
self.assertTrue('test event 0' in first_event_text)
- last_event = os.path.join(tmp_evt, events[-1])
+ last_event = os.path.join(self.tmp_dir, events[-1])
with open(last_event) as last_fh:
last_event_text = last_fh.read()
self.assertTrue('last event' in last_event_text)
- shutil.rmtree(tmp_evt)
-
def test_save_event_cleanup(self):
- tmp_evt = tempfile.mkdtemp()
- init_event_logger(tmp_evt)
-
for i in range(0, 2000):
- evt = os.path.join(tmp_evt, '{0}.tld'.format(ustr(1491004920536531 + i)))
+ evt = os.path.join(self.tmp_dir, '{0}.tld'.format(ustr(1491004920536531 + i)))
with open(evt, 'w') as fh:
fh.write('test event {0}'.format(i))
- events = os.listdir(tmp_evt)
+ events = os.listdir(self.tmp_dir)
self.assertTrue(len(events) == 2000, "{0} events found, 2000 expected".format(len(events)))
add_event('test', message='last event')
- events = os.listdir(tmp_evt)
+ events = os.listdir(self.tmp_dir)
events.sort()
self.assertTrue(len(events) == 1000, "{0} events found, 1000 expected".format(len(events)))
- first_event = os.path.join(tmp_evt, events[0])
+ first_event = os.path.join(self.tmp_dir, events[0])
with open(first_event) as first_fh:
first_event_text = first_fh.read()
self.assertTrue('test event 1001' in first_event_text)
- last_event = os.path.join(tmp_evt, events[-1])
+ last_event = os.path.join(self.tmp_dir, events[-1])
with open(last_event) as last_fh:
last_event_text = last_fh.read()
self.assertTrue('last event' in last_event_text)