diff options
Diffstat (limited to 'azurelinuxagent/event.py')
-rw-r--r-- | azurelinuxagent/event.py | 171 |
1 files changed, 52 insertions, 119 deletions
diff --git a/azurelinuxagent/event.py b/azurelinuxagent/event.py index 02e8017..f38b242 100644 --- a/azurelinuxagent/event.py +++ b/azurelinuxagent/event.py @@ -25,14 +25,15 @@ import datetime import threading import platform import azurelinuxagent.logger as logger -from azurelinuxagent.future import text -import azurelinuxagent.protocol as prot +from azurelinuxagent.exception import EventError, ProtocolError +from azurelinuxagent.future import ustr +from azurelinuxagent.protocol.restapi import TelemetryEventParam, \ + TelemetryEventList, \ + TelemetryEvent, \ + set_properties, get_properties from azurelinuxagent.metadata import DISTRO_NAME, DISTRO_VERSION, \ DISTRO_CODE_NAME, AGENT_VERSION -from azurelinuxagent.utils.osutil import OSUTIL -class EventError(Exception): - pass class WALAEventOperation: HeartBeat="HeartBeat" @@ -47,132 +48,65 @@ class WALAEventOperation: ActivateResourceDisk="ActivateResourceDisk" UnhandledError="UnhandledError" -class EventMonitor(object): +class EventLogger(object): def __init__(self): - self.sysinfo = [] - self.event_dir = os.path.join(OSUTIL.get_lib_dir(), "events") + self.event_dir = None - def init_sysinfo(self): - osversion = "{0}:{1}-{2}-{3}:{4}".format(platform.system(), - DISTRO_NAME, - DISTRO_VERSION, - DISTRO_CODE_NAME, - platform.release()) + def save_event(self, data): + if self.event_dir is None: + logger.warn("Event reporter is not initialized.") + return - self.sysinfo.append(prot.TelemetryEventParam("OSVersion", osversion)) - self.sysinfo.append(prot.TelemetryEventParam("GAVersion", - AGENT_VERSION)) - self.sysinfo.append(prot.TelemetryEventParam("RAM", - OSUTIL.get_total_mem())) - self.sysinfo.append(prot.TelemetryEventParam("Processors", - OSUTIL.get_processor_cores())) - try: - protocol = prot.FACTORY.get_default_protocol() - vminfo = protocol.get_vminfo() - self.sysinfo.append(prot.TelemetryEventParam("VMName", - vminfo.vmName)) - #TODO add other system info like, subscription id, etc. - except prot.ProtocolError as e: - logger.warn("Failed to get vm info: {0}", e) - - def start(self): - event_thread = threading.Thread(target = self.run) - event_thread.setDaemon(True) - event_thread.start() + if not os.path.exists(self.event_dir): + os.mkdir(self.event_dir) + os.chmod(self.event_dir, 0o700) + if len(os.listdir(self.event_dir)) > 1000: + raise EventError("Too many files under: {0}".format(self.event_dir)) - def collect_event(self, evt_file_name): + filename = os.path.join(self.event_dir, ustr(int(time.time()*1000000))) try: - logger.verb("Found event file: {0}", evt_file_name) - with open(evt_file_name, "rb") as evt_file: - #if fail to open or delete the file, throw exception - json_str = evt_file.read().decode("utf-8",'ignore') - logger.verb("Processed event file: {0}", evt_file_name) - os.remove(evt_file_name) - return json_str + with open(filename+".tmp",'wb+') as hfile: + hfile.write(data.encode("utf-8")) + os.rename(filename+".tmp", filename+".tld") except IOError as e: - msg = "Failed to process {0}, {1}".format(evt_file_name, e) - raise EventError(msg) - - def collect_and_send_events(self): - event_list = prot.TelemetryEventList() - event_files = os.listdir(self.event_dir) - for event_file in event_files: - if not event_file.endswith(".tld"): - continue - event_file_path = os.path.join(self.event_dir, event_file) - try: - data_str = self.collect_event(event_file_path) - except EventError as e: - logger.error("{0}", e) - continue - try: - data = json.loads(data_str) - except ValueError as e: - logger.verb(data_str) - logger.error("Failed to decode json event file: {0}", e) - continue - - event = prot.TelemetryEvent() - prot.set_properties("event", event, data) - event.parameters.extend(self.sysinfo) - event_list.events.append(event) - if len(event_list.events) == 0: - return - + raise EventError("Failed to write events to file:{0}", e) + + def add_event(self, name, op="", is_success=True, duration=0, version="1.0", + message="", evt_type="", is_internal=False): + event = TelemetryEvent(1, "69B669B9-4AF8-4C50-BDC4-6006FA76E975") + event.parameters.append(TelemetryEventParam('Name', name)) + event.parameters.append(TelemetryEventParam('Version', version)) + event.parameters.append(TelemetryEventParam('IsInternal', is_internal)) + event.parameters.append(TelemetryEventParam('Operation', op)) + event.parameters.append(TelemetryEventParam('OperationSuccess', + is_success)) + event.parameters.append(TelemetryEventParam('Message', message)) + event.parameters.append(TelemetryEventParam('Duration', duration)) + event.parameters.append(TelemetryEventParam('ExtensionType', evt_type)) + + data = get_properties(event) try: - protocol = prot.FACTORY.get_default_protocol() - protocol.report_event(event_list) - except prot.ProtocolError as e: + self.save_event(json.dumps(data)) + except EventError as e: logger.error("{0}", e) - def run(self): - self.init_sysinfo() - last_heartbeat = datetime.datetime.min - period = datetime.timedelta(hours = 12) - while(True): - if (datetime.datetime.now()-last_heartbeat) > period: - last_heartbeat = datetime.datetime.now() - add_event(op=WALAEventOperation.HeartBeat, - name="WALA",is_success=True) - self.collect_and_send_events() - time.sleep(60) - -def save_event(data): - event_dir = os.path.join(OSUTIL.get_lib_dir(), 'events') - if not os.path.exists(event_dir): - os.mkdir(event_dir) - os.chmod(event_dir,0o700) - if len(os.listdir(event_dir)) > 1000: - raise EventError("Too many files under: {0}", event_dir) - - filename = os.path.join(event_dir, text(int(time.time()*1000000))) - try: - with open(filename+".tmp",'wb+') as hfile: - hfile.write(data.encode("utf-8")) - os.rename(filename+".tmp", filename+".tld") - except IOError as e: - raise EventError("Failed to write events to file:{0}", e) +__event_logger__ = EventLogger() def add_event(name, op="", is_success=True, duration=0, version="1.0", - message="", evt_type="", is_internal=False): + message="", evt_type="", is_internal=False, + reporter=__event_logger__): log = logger.info if is_success else logger.error log("Event: name={0}, op={1}, message={2}", name, op, message) - event = prot.TelemetryEvent(1, "69B669B9-4AF8-4C50-BDC4-6006FA76E975") - event.parameters.append(prot.TelemetryEventParam('Name', name)) - event.parameters.append(prot.TelemetryEventParam('Version', version)) - event.parameters.append(prot.TelemetryEventParam('IsInternal', is_internal)) - event.parameters.append(prot.TelemetryEventParam('Operation', op)) - event.parameters.append(prot.TelemetryEventParam('OperationSuccess', - is_success)) - event.parameters.append(prot.TelemetryEventParam('Message', message)) - event.parameters.append(prot.TelemetryEventParam('Duration', duration)) - event.parameters.append(prot.TelemetryEventParam('ExtensionType', evt_type)) - data = prot.get_properties(event) - try: - save_event(json.dumps(data)) - except EventError as e: - logger.error("{0}", e) + if reporter.event_dir is None: + logger.warn("Event reporter is not initialized.") + return + reporter.add_event(name, op=op, is_success=is_success, duration=duration, + version=version, message=message, evt_type=evt_type, + is_internal=is_internal) + +def init_event_logger(event_dir, reporter=__event_logger__): + reporter.event_dir = event_dir def dump_unhandled_err(name): if hasattr(sys, 'last_type') and hasattr(sys, 'last_value') and \ @@ -184,8 +118,7 @@ def dump_unhandled_err(name): last_traceback) message= "".join(error) add_event(name, is_success=False, message=message, - op=WALAEventOperation.UnhandledError) + op=WALAEventOperation.UnhandledError) def enable_unhandled_err_dump(name): atexit.register(dump_unhandled_err, name) - |