summaryrefslogtreecommitdiff
path: root/azurelinuxagent/event.py
diff options
context:
space:
mode:
Diffstat (limited to 'azurelinuxagent/event.py')
-rw-r--r--azurelinuxagent/event.py171
1 files changed, 52 insertions, 119 deletions
diff --git a/azurelinuxagent/event.py b/azurelinuxagent/event.py
index 02e8017..f38b242 100644
--- a/azurelinuxagent/event.py
+++ b/azurelinuxagent/event.py
@@ -25,14 +25,15 @@ import datetime
import threading
import platform
import azurelinuxagent.logger as logger
-from azurelinuxagent.future import text
-import azurelinuxagent.protocol as prot
+from azurelinuxagent.exception import EventError, ProtocolError
+from azurelinuxagent.future import ustr
+from azurelinuxagent.protocol.restapi import TelemetryEventParam, \
+ TelemetryEventList, \
+ TelemetryEvent, \
+ set_properties, get_properties
from azurelinuxagent.metadata import DISTRO_NAME, DISTRO_VERSION, \
DISTRO_CODE_NAME, AGENT_VERSION
-from azurelinuxagent.utils.osutil import OSUTIL
-class EventError(Exception):
- pass
class WALAEventOperation:
HeartBeat="HeartBeat"
@@ -47,132 +48,65 @@ class WALAEventOperation:
ActivateResourceDisk="ActivateResourceDisk"
UnhandledError="UnhandledError"
-class EventMonitor(object):
+class EventLogger(object):
def __init__(self):
- self.sysinfo = []
- self.event_dir = os.path.join(OSUTIL.get_lib_dir(), "events")
+ self.event_dir = None
- def init_sysinfo(self):
- osversion = "{0}:{1}-{2}-{3}:{4}".format(platform.system(),
- DISTRO_NAME,
- DISTRO_VERSION,
- DISTRO_CODE_NAME,
- platform.release())
+ def save_event(self, data):
+ if self.event_dir is None:
+ logger.warn("Event reporter is not initialized.")
+ return
- self.sysinfo.append(prot.TelemetryEventParam("OSVersion", osversion))
- self.sysinfo.append(prot.TelemetryEventParam("GAVersion",
- AGENT_VERSION))
- self.sysinfo.append(prot.TelemetryEventParam("RAM",
- OSUTIL.get_total_mem()))
- self.sysinfo.append(prot.TelemetryEventParam("Processors",
- OSUTIL.get_processor_cores()))
- try:
- protocol = prot.FACTORY.get_default_protocol()
- vminfo = protocol.get_vminfo()
- self.sysinfo.append(prot.TelemetryEventParam("VMName",
- vminfo.vmName))
- #TODO add other system info like, subscription id, etc.
- except prot.ProtocolError as e:
- logger.warn("Failed to get vm info: {0}", e)
-
- def start(self):
- event_thread = threading.Thread(target = self.run)
- event_thread.setDaemon(True)
- event_thread.start()
+ if not os.path.exists(self.event_dir):
+ os.mkdir(self.event_dir)
+ os.chmod(self.event_dir, 0o700)
+ if len(os.listdir(self.event_dir)) > 1000:
+ raise EventError("Too many files under: {0}".format(self.event_dir))
- def collect_event(self, evt_file_name):
+ filename = os.path.join(self.event_dir, ustr(int(time.time()*1000000)))
try:
- logger.verb("Found event file: {0}", evt_file_name)
- with open(evt_file_name, "rb") as evt_file:
- #if fail to open or delete the file, throw exception
- json_str = evt_file.read().decode("utf-8",'ignore')
- logger.verb("Processed event file: {0}", evt_file_name)
- os.remove(evt_file_name)
- return json_str
+ with open(filename+".tmp",'wb+') as hfile:
+ hfile.write(data.encode("utf-8"))
+ os.rename(filename+".tmp", filename+".tld")
except IOError as e:
- msg = "Failed to process {0}, {1}".format(evt_file_name, e)
- raise EventError(msg)
-
- def collect_and_send_events(self):
- event_list = prot.TelemetryEventList()
- event_files = os.listdir(self.event_dir)
- for event_file in event_files:
- if not event_file.endswith(".tld"):
- continue
- event_file_path = os.path.join(self.event_dir, event_file)
- try:
- data_str = self.collect_event(event_file_path)
- except EventError as e:
- logger.error("{0}", e)
- continue
- try:
- data = json.loads(data_str)
- except ValueError as e:
- logger.verb(data_str)
- logger.error("Failed to decode json event file: {0}", e)
- continue
-
- event = prot.TelemetryEvent()
- prot.set_properties("event", event, data)
- event.parameters.extend(self.sysinfo)
- event_list.events.append(event)
- if len(event_list.events) == 0:
- return
-
+ raise EventError("Failed to write events to file:{0}", e)
+
+ def add_event(self, name, op="", is_success=True, duration=0, version="1.0",
+ message="", evt_type="", is_internal=False):
+ event = TelemetryEvent(1, "69B669B9-4AF8-4C50-BDC4-6006FA76E975")
+ event.parameters.append(TelemetryEventParam('Name', name))
+ event.parameters.append(TelemetryEventParam('Version', version))
+ event.parameters.append(TelemetryEventParam('IsInternal', is_internal))
+ event.parameters.append(TelemetryEventParam('Operation', op))
+ event.parameters.append(TelemetryEventParam('OperationSuccess',
+ is_success))
+ event.parameters.append(TelemetryEventParam('Message', message))
+ event.parameters.append(TelemetryEventParam('Duration', duration))
+ event.parameters.append(TelemetryEventParam('ExtensionType', evt_type))
+
+ data = get_properties(event)
try:
- protocol = prot.FACTORY.get_default_protocol()
- protocol.report_event(event_list)
- except prot.ProtocolError as e:
+ self.save_event(json.dumps(data))
+ except EventError as e:
logger.error("{0}", e)
- def run(self):
- self.init_sysinfo()
- last_heartbeat = datetime.datetime.min
- period = datetime.timedelta(hours = 12)
- while(True):
- if (datetime.datetime.now()-last_heartbeat) > period:
- last_heartbeat = datetime.datetime.now()
- add_event(op=WALAEventOperation.HeartBeat,
- name="WALA",is_success=True)
- self.collect_and_send_events()
- time.sleep(60)
-
-def save_event(data):
- event_dir = os.path.join(OSUTIL.get_lib_dir(), 'events')
- if not os.path.exists(event_dir):
- os.mkdir(event_dir)
- os.chmod(event_dir,0o700)
- if len(os.listdir(event_dir)) > 1000:
- raise EventError("Too many files under: {0}", event_dir)
-
- filename = os.path.join(event_dir, text(int(time.time()*1000000)))
- try:
- with open(filename+".tmp",'wb+') as hfile:
- hfile.write(data.encode("utf-8"))
- os.rename(filename+".tmp", filename+".tld")
- except IOError as e:
- raise EventError("Failed to write events to file:{0}", e)
+__event_logger__ = EventLogger()
def add_event(name, op="", is_success=True, duration=0, version="1.0",
- message="", evt_type="", is_internal=False):
+ message="", evt_type="", is_internal=False,
+ reporter=__event_logger__):
log = logger.info if is_success else logger.error
log("Event: name={0}, op={1}, message={2}", name, op, message)
- event = prot.TelemetryEvent(1, "69B669B9-4AF8-4C50-BDC4-6006FA76E975")
- event.parameters.append(prot.TelemetryEventParam('Name', name))
- event.parameters.append(prot.TelemetryEventParam('Version', version))
- event.parameters.append(prot.TelemetryEventParam('IsInternal', is_internal))
- event.parameters.append(prot.TelemetryEventParam('Operation', op))
- event.parameters.append(prot.TelemetryEventParam('OperationSuccess',
- is_success))
- event.parameters.append(prot.TelemetryEventParam('Message', message))
- event.parameters.append(prot.TelemetryEventParam('Duration', duration))
- event.parameters.append(prot.TelemetryEventParam('ExtensionType', evt_type))
- data = prot.get_properties(event)
- try:
- save_event(json.dumps(data))
- except EventError as e:
- logger.error("{0}", e)
+ if reporter.event_dir is None:
+ logger.warn("Event reporter is not initialized.")
+ return
+ reporter.add_event(name, op=op, is_success=is_success, duration=duration,
+ version=version, message=message, evt_type=evt_type,
+ is_internal=is_internal)
+
+def init_event_logger(event_dir, reporter=__event_logger__):
+ reporter.event_dir = event_dir
def dump_unhandled_err(name):
if hasattr(sys, 'last_type') and hasattr(sys, 'last_value') and \
@@ -184,8 +118,7 @@ def dump_unhandled_err(name):
last_traceback)
message= "".join(error)
add_event(name, is_success=False, message=message,
- op=WALAEventOperation.UnhandledError)
+ op=WALAEventOperation.UnhandledError)
def enable_unhandled_err_dump(name):
atexit.register(dump_unhandled_err, name)
-