diff options
Diffstat (limited to 'waagent')
-rw-r--r-- | waagent | 1136 |
1 files changed, 947 insertions, 189 deletions
@@ -2,7 +2,7 @@ # # Windows Azure Linux Agent # -# Copyright 2014 Microsoft Corporation +# Copyright 2015 Microsoft Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,7 +16,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -# Requires Python 2.4+ and Openssl 1.0+ +# Requires Python 2.6+ and Openssl 1.0+ # # Implements parts of RFC 2131, 1541, 1497 and # http://msdn.microsoft.com/en-us/library/cc227282%28PROT.10%29.aspx @@ -48,6 +48,8 @@ import fcntl import inspect import zipfile import json +import datetime +import xml.sax.saxutils if not hasattr(subprocess,'check_output'): def check_output(*popenargs, **kwargs): @@ -78,7 +80,7 @@ if not hasattr(subprocess,'check_output'): GuestAgentName = "WALinuxAgent" GuestAgentLongName = "Windows Azure Linux Agent" -GuestAgentVersion = "WALinuxAgent-2.0.8" +GuestAgentVersion = "WALinuxAgent-2.0.12" ProtocolVersion = "2012-11-30" #WARNING this value is used to confirm the correct fabric protocol. Config = None @@ -170,12 +172,13 @@ class AbstractDistro(object): self.ssh_config_file='/etc/ssh/sshd_config' self.hostname_file_path='/etc/hostname' self.dhcp_client_name='dhclient' - self.requiredDeps = [ 'route', 'shutdown', 'ssh-keygen', 'useradd' - , 'openssl', 'sfdisk', 'fdisk', 'mkfs', 'chpasswd', 'sed', 'grep', 'sudo' ] + self.requiredDeps = [ 'route', 'shutdown', 'ssh-keygen', 'useradd', + 'openssl', 'sfdisk', 'fdisk', 'mkfs', 'chpasswd', + 'sed', 'grep', 'sudo', 'parted' ] self.init_script_file='/etc/init.d/waagent' self.agent_package_name='WALinuxAgent' self.fileBlackList = [ "/root/.bash_history", "/var/log/waagent.log",'/etc/resolv.conf' ] - self.agent_files_to_uninstall = ["/etc/waagent.conf", "/etc/logrotate.d/waagent", "/etc/sudoers.d/waagent"] + self.agent_files_to_uninstall = ["/etc/waagent.conf", "/etc/logrotate.d/waagent"] self.grubKernelBootOptionsFile = '/etc/default/grub' self.grubKernelBootOptionsLine = 'GRUB_CMDLINE_LINUX_DEFAULT=' self.getpidcmd = 'pidof' @@ -183,6 +186,7 @@ class AbstractDistro(object): self.sudoers_dir_base = '/etc' self.waagent_conf_file = WaagentConf self.shadow_file_mode=0600 + self.dhcp_enabled = False def isSelinuxSystem(self): """ @@ -471,13 +475,36 @@ class AbstractDistro(object): fs = Config.get("ResourceDisk.Filesystem") if fs == None: fs = "ext3" - if RunGetOutput("sfdisk -q -c " + device + " 1")[1].rstrip() == "7" and fs != "ntfs": - Run("sfdisk -c " + device + " 1 83") - Run("mkfs." + fs + " " + device + "1") - if Run("mount " + device + "1 " + mountpoint): - Error("ActivateResourceDisk: Failed to mount resource disk (" + device + "1).") - return - Log("Resource disk (" + device + "1) is mounted at " + mountpoint + " with fstype " + fs) + + partition = device + "1" + + #Check partition type + Log("Detect GPT...") + ret = RunGetOutput("parted {0} print".format(device)) + if ret[0] == 0 and "gpt" in ret[1]: + Log("GPT detected.") + #GPT(Guid Partition Table) is used. + #Get partitions. + parts = filter(lambda x : re.match("^\s*[0-9]+", x), ret[1].split("\n")) + #If there are more than 1 partitions, remove all partitions + #and create a new one using the entire disk space. + if len(parts) > 1: + for i in range(1, len(parts) + 1): + Run("parted {0} rm {1}".format(device, i)) + Run("parted {0} mkpart primary 0% 100%".format(device)) + Run("mkfs." + fs + " " + partition + " -F") + else: + existingFS = RunGetOutput("sfdisk -q -c " + device + " 1", chk_err=False)[1].rstrip() + if existingFS == "7" and fs != "ntfs": + Run("sfdisk -c " + device + " 1 83") + Run("mkfs." + fs + " " + partition) + if Run("mount " + partition + " " + mountpoint): + #If mount failed, try to format the partition and mount again + Run("mkfs." + fs + " " + partition + " -F") + if Run("mount " + partition + " " + mountpoint): + Error("ActivateResourceDisk: Failed to mount resource disk (" + partition + ").") + return + Log("Resource disk (" + partition + ") is mounted at " + mountpoint + " with fstype " + fs) #Create README file under the root of resource disk SetFileContents(os.path.join(mountpoint,README_FILENAME), README_FILECONTENT) @@ -555,6 +582,39 @@ class AbstractDistro(object): Error("Can't find host key: {0}".format(path)) return False + def isDHCPEnabled(self): + return self.dhcp_enabled + + def stopDHCP(self): + """ + Stop the system DHCP client so that tha agent can bind on its port. If + the distro has set dhcp_enabled to True, it will need to provide an + implementation of this method. + """ + raise NotImplementedError('stopDHCP method missing') + + def startDHCP(self): + """ + Start the system DHCP client. If the distro has set dhcp_enabled to + True, it will need to provide an implementation of this method. + """ + raise NotImplementedError('startDHCP method missing') + + def translateCustomData(self, data): + """ + Translate the custom data from a Base64 encoding. Default to no-op. + """ + return data + + def getConfigurationPath(self): + return "/etc/waagent.conf" + + def getProcessorCores(self): + return int(RunGetOutput("grep 'processor.*:' /proc/cpuinfo |wc -l")[1]) + + def getTotalMemory(self): + return int(RunGetOutput("grep MemTotal /proc/meminfo |awk '{print $2}'")[1])/1024 + ############################################################ # GentooDistro ############################################################ @@ -569,7 +629,7 @@ name="Windows Azure Linux Agent" depend() { - needs localmount + need localmount use logger network after bootmisc modules } @@ -759,11 +819,13 @@ class SuSEDistro(AbstractDistro): self.requiredDeps += [ "/sbin/insserv" ] self.init_file=suse_init_file self.dhcp_client_name='dhcpcd' - if (DistInfo(fullname=1)[0] == 'SUSE Linux Enterprise Server') and (DistInfo()[1] >= '12'): + if ((DistInfo(fullname=1)[0] == 'SUSE Linux Enterprise Server' and DistInfo()[1] >= '12') or \ + (DistInfo(fullname=1)[0] == 'openSUSE' and DistInfo()[1] >= '13.2')): self.dhcp_client_name='wickedd-dhcp4' self.grubKernelBootOptionsFile = '/boot/grub/menu.lst' self.grubKernelBootOptionsLine = 'kernel' self.getpidcmd='pidof ' + self.dhcp_enabled=True def checkPackageInstalled(self,p): if Run("rpm -q " + p,chk_err=False): @@ -795,6 +857,12 @@ class SuSEDistro(AbstractDistro): def unregisterAgentService(self): self.stopAgentService() return self.uninstallAgentService() + + def startDHCP(self): + Run("service " + self.dhcp_client_name + " start", chk_err=False) + + def stopDHCP(self): + Run("service " + self.dhcp_client_name + " stop", chk_err=False) ############################################################ # redhatDistro @@ -929,6 +997,153 @@ class centosDistro(redhatDistro): def __init__(self): super(centosDistro,self).__init__() + +############################################################ +# CoreOSDistro +############################################################ + +class CoreOSDistro(AbstractDistro): + """ + CoreOS Distro concrete class + Put CoreOS specific behavior here... + """ + CORE_UID = 500 + + def __init__(self): + super(CoreOSDistro,self).__init__() + self.requiredDeps += [ "/usr/bin/systemctl" ] + self.agent_service_name = 'waagent' + self.init_script_file='/etc/systemd/system/waagent.service' + self.fileBlackList.append("/etc/machine-id") + self.dhcp_client_name='systemd-networkd' + self.getpidcmd='pidof ' + self.shadow_file_mode=0640 + self.waagent_path='/usr/share/oem/waagent/bin' + self.python_path='/usr/share/oem/python/bin' + self.dhcp_enabled=True + if 'PATH' in os.environ: + os.environ['PATH'] = "{0}:{1}".format(os.environ['PATH'], self.python_path) + else: + os.environ['PATH'] = self.python_path + + if 'PYTHONPATH' in os.environ: + os.environ['PYTHONPATH'] = "{0}:{1}".format(os.environ['PYTHONPATH'], self.waagent_path) + else: + os.environ['PYTHONPATH'] = self.waagent_path + + def checkPackageInstalled(self,p): + """ + There is no package manager in CoreOS. Return 1 since it must be preinstalled. + """ + return 1 + + def checkDependencies(self): + for a in self.requiredDeps: + if Run("which " + a + " > /dev/null 2>&1",chk_err=False): + Error("Missing required dependency: " + a) + return 1 + return 0 + + + def checkPackageUpdateable(self,p): + """ + There is no package manager in CoreOS. Return 0 since it can't be updated via package. + """ + return 0 + + def startAgentService(self): + return Run('systemctl start ' + self.agent_service_name) + + def stopAgentService(self): + return Run('systemctl stop ' + self.agent_service_name) + + def restartSshService(self): + """ + SSH is socket activated on CoreOS. No need to restart it. + """ + return 0 + + def sshDeployPublicKey(self,fprint,path): + """ + We support PKCS8. + """ + if Run("ssh-keygen -i -m PKCS8 -f " + fprint + " >> " + path): + return 1 + else : + return 0 + + def RestartInterface(self, iface): + Run("systemctl restart systemd-networkd") + + def CreateAccount(self, user, password, expiration, thumbprint): + """ + Create a user account, with 'user', 'password', 'expiration', ssh keys + and sudo permissions. + Returns None if successful, error string on failure. + """ + userentry = None + try: + userentry = pwd.getpwnam(user) + except: + pass + uidmin = None + try: + uidmin = int(GetLineStartingWith("UID_MIN", "/etc/login.defs").split()[1]) + except: + pass + if uidmin == None: + uidmin = 100 + if userentry != None and userentry[2] < uidmin and userentry[2] != self.CORE_UID: + Error("CreateAccount: " + user + " is a system user. Will not set password.") + return "Failed to set password for system user: " + user + " (0x06)." + if userentry == None: + command = "useradd --create-home --password '*' " + user + if expiration != None: + command += " --expiredate " + expiration.split('.')[0] + if Run(command): + Error("Failed to create user account: " + user) + return "Failed to create user account: " + user + " (0x07)." + else: + Log("CreateAccount: " + user + " already exists. Will update password.") + if password != None: + RunSendStdin("chpasswd", user + ":" + password + "\n") + try: + if password == None: + SetFileContents("/etc/sudoers.d/waagent", user + " ALL = (ALL) NOPASSWD: ALL\n") + else: + SetFileContents("/etc/sudoers.d/waagent", user + " ALL = (ALL) ALL\n") + os.chmod("/etc/sudoers.d/waagent", 0440) + except: + Error("CreateAccount: Failed to configure sudo access for user.") + return "Failed to configure sudo privileges (0x08)." + home = MyDistro.GetHome() + if thumbprint != None: + dir = home + "/" + user + "/.ssh" + CreateDir(dir, user, 0700) + pub = dir + "/id_rsa.pub" + prv = dir + "/id_rsa" + Run("ssh-keygen -y -f " + thumbprint + ".prv > " + pub) + SetFileContents(prv, GetFileContents(thumbprint + ".prv")) + for f in [pub, prv]: + os.chmod(f, 0600) + ChangeOwner(f, user) + SetFileContents(dir + "/authorized_keys", GetFileContents(pub)) + ChangeOwner(dir + "/authorized_keys", user) + Log("Created user account: " + user) + return None + + def startDHCP(self): + Run("systemctl start " + self.dhcp_client_name, chk_err=False) + + def stopDHCP(self): + Run("systemctl stop " + self.dhcp_client_name, chk_err=False) + + def translateCustomData(self, data): + return base64.b64decode(data) + + def getConfigurationPath(self): + return "/usr/share/oem/waagent.conf" + ############################################################ # debianDistro ############################################################ @@ -1155,7 +1370,7 @@ class UbuntuDistro(debianDistro): """ Ubuntu specific warning string from Deprovision. """ - print("WARNING! Nameserver configuration in /etc/resolvconf/resolv.conf.d/{tail,originial} will be deleted.") + print("WARNING! Nameserver configuration in /etc/resolvconf/resolv.conf.d/{tail,original} will be deleted.") def deprovisionDeleteFiles(self): """ @@ -1169,7 +1384,7 @@ class UbuntuDistro(debianDistro): else: Log("resolvconf is enabled; leaving /etc/resolv.conf intact") resolvConfD = '/etc/resolvconf/resolv.conf.d/' - self.fileBlackList.extend([resolvConfD + 'tail', resolvConfD + 'originial']) + self.fileBlackList.extend([resolvConfD + 'tail', resolvConfD + 'original']) for f in os.listdir(LibDir)+self.fileBlackList: try: os.remove(f) @@ -1213,6 +1428,21 @@ class LinuxMintDistro(UbuntuDistro): ############################################################ # fedoraDistro ############################################################ +fedora_systemd_service = """\ +[Unit] +Description=Windows Azure Linux Agent +After=network.target +After=sshd.service +ConditionFileIsExecutable=/usr/sbin/waagent +ConditionPathExists=/etc/waagent.conf + +[Service] +Type=simple +ExecStart=/usr/sbin/waagent -daemon + +[Install] +WantedBy=multi-user.target +""" class fedoraDistro(redhatDistro): """ @@ -1221,6 +1451,104 @@ class fedoraDistro(redhatDistro): """ def __init__(self): super(fedoraDistro,self).__init__() + self.service_cmd = '/usr/bin/systemctl' + self.hostname_file_path = '/etc/hostname' + self.init_script_file = '/usr/lib/systemd/system/' + self.agent_service_name + '.service' + self.init_file = fedora_systemd_service + self.grubKernelBootOptionsFile = '/etc/default/grub' + self.grubKernelBootOptionsLine = 'GRUB_CMDLINE_LINUX=' + + def publishHostname(self, name): + SetFileContents(self.hostname_file_path, name + '\n') + ethernetInterface = MyDistro.GetInterfaceName() + filepath = "/etc/sysconfig/network-scripts/ifcfg-" + ethernetInterface + if os.path.isfile(filepath): + ReplaceFileContentsAtomic(filepath, "DHCP_HOSTNAME=" + name + "\n" + + "\n".join(filter(lambda a: not a.startswith("DHCP_HOSTNAME"), GetFileContents(filepath).split('\n')))) + return 0 + + def installAgentServiceScriptFiles(self): + SetFileContents(self.init_script_file, self.init_file) + os.chmod(self.init_script_file, 0644) + return Run(self.service_cmd + ' daemon-reload') + + def registerAgentService(self): + self.installAgentServiceScriptFiles() + return Run(self.service_cmd + ' enable ' + self.agent_service_name) + + def uninstallAgentService(self): + """ + Call service subsystem to remove waagent script. + """ + return Run(self.service_cmd + ' disable ' + self.agent_service_name) + + def unregisterAgentService(self): + """ + Calls self.stopAgentService and call self.uninstallAgentService() + """ + self.stopAgentService() + self.uninstallAgentService() + + def startAgentService(self): + """ + Service call to start the Agent service + """ + return Run(self.service_cmd + ' start ' + self.agent_service_name) + + def stopAgentService(self): + """ + Service call to stop the Agent service + """ + return Run(self.service_cmd + ' stop ' + self.agent_service_name, False) + + def restartSshService(self): + """ + Service call to re(start) the SSH service + """ + sshRestartCmd = self.service_cmd + " " + self.ssh_service_restart_option + " " + self.ssh_service_name + retcode = Run(sshRestartCmd) + if retcode > 0: + Error("Failed to restart SSH service with return code:" + str(retcode)) + return retcode + + def checkPackageInstalled(self, p): + """ + Query package database for prescence of an installed package. + """ + import rpm + ts = rpm.TransactionSet() + rpms = ts.dbMatch(rpm.RPMTAG_PROVIDES, p) + return bool(len(rpms) > 0) + + def deleteRootPassword(self): + return Run("/sbin/usermod root -p '!!'") + + def packagedInstall(self,buildroot): + """ + Called from setup.py for use by RPM. + Copies generated files waagent.conf, under the buildroot. + """ + if not os.path.exists(buildroot+'/etc'): + os.mkdir(buildroot+'/etc') + SetFileContents(buildroot+'/etc/waagent.conf', MyDistro.waagent_conf_file) + + if not os.path.exists(buildroot+'/etc/logrotate.d'): + os.mkdir(buildroot+'/etc/logrotate.d') + SetFileContents(buildroot+'/etc/logrotate.d/WALinuxAgent', WaagentLogrotate) + + self.init_script_file=buildroot+self.init_script_file + # this allows us to call installAgentServiceScriptFiles() + if not os.path.exists(os.path.dirname(self.init_script_file)): + os.mkdir(os.path.dirname(self.init_script_file)) + self.installAgentServiceScriptFiles() + + def CreateAccount(self, user, password, expiration, thumbprint): + super(fedoraDistro, self).CreateAccount(user, password, expiration, thumbprint) + Run('/sbin/usermod ' + user + ' -G wheel') + + def DeleteAccount(self, user): + Run('/sbin/usermod ' + user + ' -G ""') + super(fedoraDistro, self).DeleteAccount(user) ############################################################ # FreeBSD @@ -1310,6 +1638,7 @@ if Run("mount " + device + "s1 " + mountpoint): waagent.Error("ActivateResourceDisk: Failed to mount resource disk (" + device + "s1).") sys.exit(0) waagent.Log("Resource disk (" + device + "s1) is mounted at " + mountpoint + " with fstype " + fs) +waagent.SetFileContents(os.path.join(mountpoint,waagent.README_FILENAME), waagent.README_FILECONTENT) swap = Config.get("ResourceDisk.EnableSwap") if swap == None or swap.lower().startswith("n"): sys.exit(0) @@ -1347,7 +1676,7 @@ class FreeBSDDistro(AbstractDistro): self.init_file=bsd_init_file self.agent_package_name='WALinuxAgent' self.fileBlackList = [ "/root/.bash_history", "/var/log/waagent.log",'/etc/resolv.conf' ] - self.agent_files_to_uninstall = ["/etc/waagent.conf", "/usr/local/etc/sudoers.d/waagent"] + self.agent_files_to_uninstall = ["/etc/waagent.conf"] self.grubKernelBootOptionsFile = '/boot/loader.conf' self.grubKernelBootOptionsLine = '' self.getpidcmd = 'pgrep -n' @@ -1455,7 +1784,7 @@ class FreeBSDDistro(AbstractDistro): code,output=RunGetOutput("ifconfig",chk_err=False) Log(output) retries=10 - cmd='ifconfig | grep -A1 -B2 ether | grep -B3 inet | grep -A3 UP ' + cmd='ifconfig | grep -A2 -B2 ether | grep -B3 inet | grep -A4 UP ' code=1 while code > 0 : @@ -1501,7 +1830,8 @@ class FreeBSDDistro(AbstractDistro): pass uidmin = None try: - uidmin = int(GetLineStartingWith("UID_MIN", "/etc/login.defs").split()[1]) + if os.path.isfile("/etc/login.defs"): + uidmin = int(GetLineStartingWith("UID_MIN", "/etc/login.defs").split()[1]) except: pass if uidmin == None: @@ -1568,7 +1898,8 @@ class FreeBSDDistro(AbstractDistro): return uidmin = None try: - uidmin = int(GetLineStartingWith("UID_MIN", "/etc/login.defs").split()[1]) + if os.path.isfile("/etc/login.defs"): + uidmin = int(GetLineStartingWith("UID_MIN", "/etc/login.defs").split()[1]) except: pass if uidmin == None: @@ -1577,7 +1908,7 @@ class FreeBSDDistro(AbstractDistro): Error("DeleteAccount: " + user + " is a system user. Will not delete account.") return Run("> /var/run/utmp") #Delete utmp to prevent error if we are the 'user' deleted - Run("rmuser -y " + user) + pid = subprocess.Popen(['rmuser', '-y', user], stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE).pid try: os.remove(MyDistro.sudoers_dir_base+"/sudoers.d/waagent") except: @@ -1679,6 +2010,12 @@ class FreeBSDDistro(AbstractDistro): def setBlockDeviceTimeout(self, device, timeout): return + def getProcessorCores(self): + return int(RunGetOutput("sysctl hw.ncpu | awk '{print $2}'")[1]) + + def getTotalMemory(self): + return int(RunGetOutput("sysctl hw.realmem | awk '{print $2}'")[1])/1024 + ############################################################ # END DISTRO CLASS DEFS ############################################################ @@ -2034,8 +2371,9 @@ def SimpleLog(file_path,message): t = "%04u/%02u/%02u %02u:%02u:%02u " % (t.tm_year, t.tm_mon, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec) lines=re.sub(re.compile(r'^(.)',re.MULTILINE),t+r'\1',message) with open(file_path, "a") as F : + lines = filter(lambda x : x in string.printable, lines) F.write(lines.encode('ascii','ignore') + "\n") - + class Logger(object): """ The Agent's logging assumptions are: @@ -2066,9 +2404,13 @@ class Logger(object): Write 'message' to logfile. """ if self.file_path: - with open(self.file_path, "a") as F : - F.write(message.encode('ascii','ignore') + "\n") - F.close() + try: + with open(self.file_path, "a") as F : + message = filter(lambda x : x in string.printable, message) + F.write(message.encode('ascii','ignore') + "\n") + except IOError, e: + print e + pass def LogToCon(self,message): """ @@ -2077,9 +2419,13 @@ class Logger(object): is redirected to ttys0 in kernel boot options. """ if self.con_path: - with open(self.con_path, "w") as C : - C.write(message.encode('ascii','ignore') + "\n") - C.close() + try: + with open(self.con_path, "w") as C : + message = filter(lambda x : x in string.printable, message) + C.write(message.encode('ascii','ignore') + "\n") + except IOError, e: + print e + pass def Log(self,message): """ @@ -2243,140 +2589,233 @@ class Util(object): Http communication class. Base of GoalState, and Agent classes. """ - def _HttpGet(self, url, headers): + __RetryWaitingInterval=10 + + def __init__(self): + self.Endpoint = None + + def _ParseUrl(self, url): + secure = False + host = self.Endpoint + action = url + + #Strip "http[s]://hostname/" from url + if url.startswith("http://"): + url = url[7:] + pos = url.index("/") + if pos > 0: + host = url[0: pos] + action = url[pos:] + elif url.startswith("https://"): + secure = True + url = url[8:] + pos = url.index("/") + if pos > 0: + host = url[0:pos] + action = url[pos:] + return host, action, secure + + def _HttpRequest(self, method, host, action, data=None, + secure=False, headers=None): + resp = None; + try: + httpConnection = None + + #If httplib module is not built with ssl support. Failback to http + if secure and hasattr(httplib, "HTTPSConnection"): + httpConnection = httplib.HTTPSConnection(host) + else: + httpConnection = httplib.HTTPConnection(host) + if headers == None: + httpConnection.request(method, action, data) + else: + httpConnection.request(method, action, data, headers) + resp = httpConnection.getresponse() + except httplib.HTTPException, e: + Error('HTTPException {0}, args:{1}'.format(e, repr(e.args))) + except IOError, e: + Error('Socket IOError {0}, args:{1}'.format(e, repr(e.args))) + return resp + + def HttpRequest(self, method, url, data, headers=None, maxRetry=3): """ - Do HTTP get on 'url' with 'headers'. + Sending http request to server On error, sleep 10 and maxRetry times. Return the output buffer or None. """ - LogIfVerbose("HttpGet(" + url + ")") - maxRetry = 2 - if url.startswith("http://"): - url = url[7:] - url = url[url.index("/"):] - for retry in range(0, maxRetry + 1): - strRetry = str(retry) - log = [NoLog, Error][retry > 0] - log("retry HttpGet(" + url + "),retry=" + strRetry) - response = None - strStatus = "None" - try: - httpConnection = httplib.HTTPConnection(self.Endpoint) - if headers == None: - request = httpConnection.request("GET", url) - else: - request = httpConnection.request("GET", url, None, headers) - response = httpConnection.getresponse() - strStatus = str(response.status) - except httplib.HTTPException, e: - Error('HTTPException ' + e.message + ' args: ' + repr(e.args)) - except IOError, e: - Error('socket IOError ' + e.message + ' args: ' + repr(e.args)) - log("response HttpGet(" + url + "),retry=" + strRetry + ",status=" + strStatus) - if response == None or response.status != httplib.OK: - Error("HttpGet(" + url + "),retry=" + strRetry + ",status=" + strStatus) - if retry == maxRetry: - Error("return HttpGet(" + url + "),retry=" + strRetry + ",status=" + strStatus) - return None - else: - Error("sleep 10 seconds HttpGet(" + url + "),retry=" + strRetry + ",status=" + strStatus) - time.sleep(10) + LogIfVerbose("HTTP Req: {0} {1}".format(method, url)) + LogIfVerbose("HTTP Req: Data={0}".format(data)) + LogIfVerbose("HTTP Req: Header={0}".format(headers)) + host, action, secure = self._ParseUrl(url) + resp = self._HttpRequest(method, host, action, data, secure, headers) + for retry in range(0, maxRetry): + if resp is not None and \ + (resp.status == httplib.OK or \ + resp.status == httplib.CREATED or \ + resp.status == httplib.ACCEPTED): + return resp; + + Error("Retry={0}".format(retry)) + Error("HTTP Req: {0} {1}".format(method, url)) + Error("HTTP Req: Data={0}".format(data)) + Error("HTTP Req: Header={0}".format(headers)) + if resp is None: + Error("HTTP Err: response is empty.".format(retry)) else: - log("return HttpGet(" + url + "),retry=" + strRetry + ",status=" + strStatus) - return response.read() + Error("HTTP Err: Status={0}".format(resp.status)) + Error("HTTP Err: Reason={0}".format(resp.reason)) + Error("HTTP Err: Header={0}".format(resp.getheaders())) + Error("HTTP Err: Body={0}".format(resp.read())) + time.sleep(self.__class__.__RetryWaitingInterval) + resp = self._HttpRequest(method, host, action, data, secure, + headers) + + return None + + def HttpGet(self, url, headers=None, maxRetry=3): + return self.HttpRequest("GET", url, None, headers, maxRetry) + + def HttpHead(self, url, headers=None, maxRetry=3): + return self.HttpRequest("HEAD", url, None, headers, maxRetry) + + def HttpPost(self, url, data, headers=None, maxRetry=3): + return self.HttpRequest("POST", url, data, headers, maxRetry) + + def HttpPut(self, url, data, headers=None, maxRetry=3): + return self.HttpRequest("PUT", url, data, headers, maxRetry) - def HttpGetWithoutHeaders(self, url): + def HttpDelete(url, data, headers=None, maxRetry=3): + return self.HttpRequest("DELETE", url, data, headers, maxRetry) + + def HttpGetWithoutHeaders(self, url, maxRetry=3): """ Return data from an HTTP get on 'url'. """ - return self._HttpGet(url, None) + resp = self.HttpGet(url, None, maxRetry) + return resp.read() if resp is not None else None - def HttpGetWithHeaders(self, url): + def HttpGetWithHeaders(self, url, maxRetry=3): """ Return data from an HTTP get on 'url' with x-ms-agent-name and x-ms-version headers. """ - return self._HttpGet(url, {"x-ms-agent-name": GuestAgentName, "x-ms-version": ProtocolVersion}) + resp = self.HttpGet(url, { + "x-ms-agent-name": GuestAgentName, + "x-ms-version": ProtocolVersion + }, maxRetry) + return resp.read() if resp is not None else None - def HttpSecureGetWithHeaders(self, url, transportCert): + def HttpSecureGetWithHeaders(self, url, transportCert, maxRetry=3): """ Return output of get using ssl cert. """ - return self._HttpGet(url, {"x-ms-agent-name": GuestAgentName, - "x-ms-version": ProtocolVersion, - "x-ms-cipher-name": "DES_EDE3_CBC", - "x-ms-guest-agent-public-x509-cert": transportCert}) + resp = self.HttpGet(url, { + "x-ms-agent-name": GuestAgentName, + "x-ms-version": ProtocolVersion, + "x-ms-cipher-name": "DES_EDE3_CBC", + "x-ms-guest-agent-public-x509-cert": transportCert + }, maxRetry) + return resp.read() if resp is not None else None + + def HttpPostWithHeaders(self, url, data, maxRetry=3): + header = { + "x-ms-agent-name": GuestAgentName, + "Content-Type": "text/xml; charset=utf-8", + "x-ms-version": ProtocolVersion + } + return self.HttpPost(url, data, header, maxRetry) + +__StorageVersion="2014-02-14" + +def GetBlobType(url): + restutil = Util() + #Check blob type + LogIfVerbose("Check blob type.") + timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + blobPropResp = restutil.HttpHead(url, { + "x-ms-date" : timestamp, + 'x-ms-version' : __StorageVersion + }); + blobType = None + if blobPropResp is None: + Error("Can't get status blob type.") + return None + blobType = blobPropResp.getheader("x-ms-blob-type") + LogIfVerbose("Blob type={0}".format(blobType)) + return blobType + +def PutBlockBlob(url, data): + restutil = Util() + LogIfVerbose("Upload block blob") + timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + ret = restutil.HttpPut(url, data, { + "x-ms-date" : timestamp, + "x-ms-blob-type" : "BlockBlob", + "Content-Length": str(len(data)), + "x-ms-version" : __StorageVersion + }) + if ret is None: + Error("Failed to upload block blob for status.") + +def PutPageBlob(url, data): + restutil = Util() + LogIfVerbose("Replace old page blob") + timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + #Align to 512 bytes + pageBlobSize = ((len(data) + 511) / 512) * 512 + ret = restutil.HttpPut(url, "", { + "x-ms-date" : timestamp, + "x-ms-blob-type" : "PageBlob", + "Content-Length": "0", + "x-ms-blob-content-length" : str(pageBlobSize), + "x-ms-version" : __StorageVersion + }) + if ret is None: + Error("Failed to clean up page blob for status") + return + + if url.index('?') < 0: + url = "{0}?comp=page".format(url) + else: + url = "{0}&comp=page".format(url) + + LogIfVerbose("Upload page blob") + pageMax = 4 * 1024 * 1024 #Max page size: 4MB + start = 0 + end = 0 + while end < len(data): + end = min(len(data), start + pageMax) + contentSize = end - start + #Align to 512 bytes + pageEnd = ((end + 511) / 512) * 512 + bufSize = pageEnd - start + buf = bytearray(bufSize) + buf[0 : contentSize] = data[start : end] + ret = restutil.HttpPut(url, buf, { + "x-ms-date" : timestamp, + "x-ms-range" : "bytes={0}-{1}".format(start, pageEnd - 1), + "x-ms-page-write" : "update", + "x-ms-version" : __StorageVersion, + "Content-Length": str(pageEnd - start) + }) + if ret is None: + Error("Failed to upload page blob for status") + return + start = end - def HttpPost(self, url, data): - """ - Send http POST to server, sleeping 10 retrying maxRetry times upon error. - """ - LogIfVerbose("HttpPost(" + url + ")") - maxRetry = 2 - for retry in range(0, maxRetry + 1): - strRetry = str(retry) - log = [NoLog, Error][retry > 0] - log("retry HttpPost(" + url + "),retry=" + strRetry) - response = None - strStatus = "None" - try: - httpConnection = httplib.HTTPConnection(self.Endpoint) - request = httpConnection.request("POST", url, data, {"x-ms-agent-name": GuestAgentName, - "Content-Type": "text/xml; charset=utf-8", - "x-ms-version": ProtocolVersion}) - response = httpConnection.getresponse() - strStatus = str(response.status) - except httplib.HTTPException, e: - Error('HTTPException ' + e.message + ' args: ' + repr(e.args)) - except IOError, e: - Error('socket IOError ' + e.message + ' args: ' + repr(e.args)) - log("response HttpPost(" + url + "),retry=" + strRetry + ",status=" + strStatus) - if response == None or (response.status != httplib.OK and response.status != httplib.ACCEPTED): - Error("HttpPost(" + url + "),retry=" + strRetry + ",status=" + strStatus) - if retry == maxRetry: - Error("return HttpPost(" + url + "),retry=" + strRetry + ",status=" + strStatus) - return None - else: - Error("sleep 10 seconds HttpPost(" + url + "),retry=" + strRetry + ",status=" + strStatus) - time.sleep(10) - else: - log("return HttpPost(" + url + "),retry=" + strRetry + ",status=" + strStatus) - return response +def UploadStatusBlob(url, data): + LogIfVerbose("Upload status blob") + LogIfVerbose("Status={0}".format(data)) + blobType = GetBlobType(url) - def HttpPutBlockBlob(self, url, data): - """ - Send http PUT to server, sleeping 10 retrying maxRetry times upon error. - """ - LogIfVerbose("HttpPutBlockBlob(" + url + ")") - maxRetry = 2 - for retry in range(0, maxRetry + 1): - strRetry = str(retry) - log = [NoLog, Error][retry > 0] - log("retry HttpPutBlockBlob(" + url + "),retry=" + strRetry) - response = None - strStatus = "None" - try: - httpConnection = httplib.HTTPConnection(self.Endpoint) - request = httpConnection.request("PUT", url, data, {"x-ms-blob-type" : "BlockBlob", "x-ms-date" : time.strftime("%Y-%M-%dT%H:%M:%SZ", time.gmtime()) ,"Content-Length": str(len(data))} ) - response = httpConnection.getresponse() - strStatus = str(response.status) - except httplib.HTTPException, e: - Error('HTTPException ' + e.message + ' args: ' + repr(e.args)) - except IOError, e: - Error('socket IOError ' + e.message + ' args: ' + repr(e.args)) - log("response HttpPutBlockBlob(" + url + "),retry=" + strRetry + ",status=" + strStatus) - if response == None or (response.status != httplib.OK and response.status != httplib.CREATED): - Error("HttpPutBlockBlob(" + url + "),retry=" + strRetry + ",status=" + strStatus) - if retry == maxRetry: - Error("return HttpPutBlockBlob(" + url + "),retry=" + strRetry + ",status=" + strStatus) - return None - else: - Error("sleep 10 seconds HttpPutBlockBlob(" + url + "),retry=" + strRetry + ",status=" + strStatus) - time.sleep(10) - else: - log("return HttpPutBlockBlob(" + url + "),retry=" + strRetry + ",status=" + strStatus) - return response + if blobType == "BlockBlob": + PutBlockBlob(url, data) + elif blobType == "PageBlob": + PutPageBlob(url, data) + else: + Error("Unknown blob type: {0}".format(blobType)) + return None class TCPHandler(SocketServer.BaseRequestHandler): """ @@ -2434,14 +2873,18 @@ class LoadBalancerProbeServer(object): class ConfigurationProvider(object): """ - Parse amd store key:values in /etc/waagent.conf. + Parse amd store key:values in waagent.conf """ def __init__(self): self.values = dict() - if os.path.isfile("/etc/waagent.conf") == False: - raise Exception("Missing configuration in /etc/waagent.conf") + if 'MyDistro' not in globals(): + global MyDistro + MyDistro = GetMyDistro() + walaConfigFile = MyDistro.getConfigurationPath() + if os.path.isfile(walaConfigFile) == False: + raise Exception("Missing configuration in {0}".format(walaConfigFile)) try: - for line in GetFileContents("/etc/waagent.conf").split('\n'): + for line in GetFileContents(walaConfigFile).split('\n'): if not line.startswith("#") and "=" in line: parts = line.split()[0].split('=') value = parts[1].strip("\" ") @@ -2450,7 +2893,7 @@ class ConfigurationProvider(object): else: self.values[parts[0]] = None except: - Error("Unable to parse /etc/waagent.conf") + Error("Unable to parse {0}".format(walaConfigFile)) raise return @@ -2678,36 +3121,61 @@ class SharedConfig(object): """ Reset members. """ - self.Deployment = None - self.Incarnation = None - self.Role = None - self.LoadBalancerSettings = None - self.OutputEndpoints = None - self.Instances = None + self.RdmaMacAddress = None + self.RdmaIPv4Address = None + self.xmlText = None def Parse(self, xmlText): """ Parse and write configuration to file SharedConfig.xml. """ self.reinitialize() - SetFileContents("SharedConfig.xml", xmlText) + self.xmlText = xmlText dom = xml.dom.minidom.parseString(xmlText) for a in [ "SharedConfig", "Deployment", "Service", "ServiceInstance", "Incarnation", "Role", ]: if not dom.getElementsByTagName(a): Error("SharedConfig.Parse: Missing " + a) - return None + node = dom.childNodes[0] if node.localName != "SharedConfig": Error("SharedConfig.Parse: root not SharedConfig") - return None + + nodes = dom.getElementsByTagName("Instance") + if nodes is not None and len(nodes) != 0: + node = nodes[0] + if node.hasAttribute("rdmaMacAddress"): + self.RdmaMacAddress = node.getAttribute("rdmaMacAddress") + if node.hasAttribute("rdmaIPv4Address"): + self.RdmaIPv4Address = node.getAttribute("rdmaIPv4Address") + return self + + def Save(self): + SetFileContents("SharedConfig.xml", self.xmlText) + + def ConfigRdma(self, dev="/dev/hvnd_rdma"): + if self.RdmaIPv4Address is not None and self.RdmaMacAddress is not None: + if os.path.isfile(dev): + data = ('rdmaMacAddress="{0}" rdmaIPv4Address="{1}"' + '').format(self.RdmaMacAddress, self.RdmaIPv4Address) + Log("Write rdma config to {0}: {1}".format(dev, data)) + try: + with open(dev, "w") as c: + c.write(data) + except IOError, e: + Error("Error writing {0}, {1}".format(dev, e)) + + def InvokeTopologyConsumer(self): program = Config.get("Role.TopologyConsumer") if program != None: try: Children.append(subprocess.Popen([program, LibDir + "/SharedConfig.xml"])) except OSError, e : ErrorWithPrefix('Agent.Run','Exception: '+ str(e) +' occured launching ' + program ) - return self + + def Process(self): + self.ConfigRdma() + self.InvokeTopologyConsumer() class ExtensionsConfig(object): """ @@ -2777,11 +3245,14 @@ class ExtensionsConfig(object): try: self.Extensions=dom.getElementsByTagName("Extensions") pg = dom.getElementsByTagName("Plugins") - self.Plugins = pg[0].getElementsByTagName("Plugin") + if len(pg) > 0: + self.Plugins = pg[0].getElementsByTagName("Plugin") + else: + self.Plugins = [] incarnation=self.Extensions[0].getAttribute("goalStateIncarnation") SetFileContents('ExtensionsConfig.'+incarnation+'.xml', xmlText) - except: - LogIfVerbose('ERROR: Error parsing ExtensionsConfig.') + except Exception, e: + Error('ERROR: Error parsing ExtensionsConfig: {0}.'.format(e)) return None for p in self.Plugins: if len(p.getAttribute("location"))<1: # this plugin is inside the PluginSettings @@ -2819,7 +3290,7 @@ class ExtensionsConfig(object): Error('Unable to disable '+name) SimpleLog(p.plugin_log,'ERROR: Unable to disable '+name) else : - self.SetHandlerState(handler, 'Installed') + self.SetHandlerState(handler, 'Disabled') Log(name+' is disabled') SimpleLog(p.plugin_log,name+' is disabled') @@ -2869,7 +3340,9 @@ class ExtensionsConfig(object): SimpleLog(p.plugin_log,"Plugin failover server is: " + self.Util.Endpoint) manifest=self.Util.HttpGetWithoutHeaders(failoverlocation) + #if failoverlocation also fail what to do then? if manifest == None: + AddExtensionEvent(name,WALAEventOperation.Download,False,0,version,"Download mainfest fail "+failoverlocation) Log("Plugin manifest" + name + "downloaded successfully length = " + str(len(manifest))) SimpleLog(p.plugin_log,"Plugin manifest" + name + "downloaded successfully length = " + str(len(manifest))) @@ -2902,9 +3375,11 @@ class ExtensionsConfig(object): # Download the zipfile archive and save as '.zip' bundle=self.Util.HttpGetWithoutHeaders(bundle_uri) if bundle == None: + AddExtensionEvent(name,WALAEventOperation.Download,True,0,version,"Download zip fail "+bundle_uri) Error("Unable to download plugin bundle" + bundle_uri ) SimpleLog(p.plugin_log,"Unable to download plugin bundle" + bundle_uri ) continue + AddExtensionEvent(name,WALAEventOperation.Download,True,0,version,"Download Success") b=bytearray(bundle) filepath=LibDir+"/" + os.path.basename(bundle_uri) + '.zip' SetFileContents(filepath,b) @@ -2976,12 +3451,12 @@ class ExtensionsConfig(object): Log(name+' version ' + previous_version + ' is disabled') SimpleLog(p.plugin_log,name+' version ' + previous_version + ' is disabled') - + isupgradeSuccess = True if getcmd=='updateCommand': if self.launchCommand(p.plugin_log,name,version,getcmd,previous_version) == None : Error('Update failed for '+name+'-'+version) SimpleLog(p.plugin_log,'Update failed for '+name+'-'+version) - + isupgradeSuccess=False else : Log('Update complete'+name+'-'+version) SimpleLog(p.plugin_log,'Update complete'+name+'-'+version) @@ -2991,18 +3466,17 @@ class ExtensionsConfig(object): self.SetHandlerState(previous_handler, 'Installed') Error('Uninstall failed for '+name+'-'+previous_version) SimpleLog(p.plugin_log,'Uninstall failed for '+name+'-'+previous_version) - + isupgradeSuccess=False else : self.SetHandlerState(previous_handler, 'NotInstalled') Log('Uninstall complete'+ previous_handler ) SimpleLog(p.plugin_log,'Uninstall complete'+ name +'-' + previous_version) - + AddExtensionEvent(name,WALAEventOperation.Upgrade,isupgradeSuccess,0,previous_version) else : # run install if self.launchCommand(p.plugin_log,name,version,getcmd) == None : self.SetHandlerState(handler, 'NotInstalled') Error('Installation failed for '+name+'-'+version) SimpleLog(p.plugin_log,'Installation failed for '+name+'-'+version) - else : self.SetHandlerState(handler, 'Installed') Log('Installation completed for '+name+'-'+version) @@ -3085,8 +3559,25 @@ class ExtensionsConfig(object): pass return self - def launchCommand(self,plugin_log,name,version,command,prev_version=None): + commandToEventOperation={ + "installCommand":WALAEventOperation.Install, + "uninstallCommand":WALAEventOperation.UnIsntall, + "updateCommand": WALAEventOperation.Upgrade, + "enableCommand": WALAEventOperation.Enable, + "disableCommand": WALAEventOperation.Disable, + } + isSuccess=True + start = datetime.datetime.now() + r=self.__launchCommandWithoutEventLog(plugin_log,name,version,command,prev_version) + if r==None: + isSuccess=False + Duration = int((datetime.datetime.now() - start).seconds) + if commandToEventOperation.get(command): + AddExtensionEvent(name,commandToEventOperation[command],isSuccess,Duration,version) + return r + + def __launchCommandWithoutEventLog(self,plugin_log,name,version,command,prev_version=None): # get the manifest and read the command mfile=None zip_dir=LibDir+"/" + name + '-' + version @@ -3099,7 +3590,7 @@ class ExtensionsConfig(object): if mfile == None : Error('HandlerManifest.json not found.') SimpleLog(plugin_log,'HandlerManifest.json not found.') - + return None manifest = GetFileContents(mfile) try: @@ -3234,8 +3725,8 @@ class ExtensionsConfig(object): except: Error('Error parsing ExtensionsConfig. Unable to send status reports') return None - self.Util.Endpoint=uri.split('/')[2] - self.Util.HttpPutBlockBlob(uri, status) + + UploadStatusBlob(uri, status.encode("utf-8")) LogIfVerbose('Status report '+status+' sent to ' + uri) return True @@ -3622,6 +4113,8 @@ class GoalState(Util): Calls HostingEnvironmentConfig.Process() """ self.HostingEnvironmentConfig.Process() + self.SharedConfig.Process() + self.SharedConfig.Save() class OvfEnv(object): """ @@ -3682,7 +4175,7 @@ class OvfEnv(object): Return self. """ self.reinitialize() - LogIfVerbose(xmlText) + LogIfVerbose(re.sub("<UserPassword>.*?<", "<UserPassword>*<", xmlText)) dom = xml.dom.minidom.parseString(xmlText) if len(dom.getElementsByTagNameNS(self.OvfNs, "Environment")) != 1: Error("Unable to parse OVF XML.") @@ -3718,7 +4211,7 @@ class OvfEnv(object): if len(CDSection) > 0 : self.CustomData=GetNodeTextData(CDSection[0]) if len(self.CustomData)>0: - SetFileContents(LibDir + '/CustomData',self.CustomData) + SetFileContents(LibDir + '/CustomData', MyDistro.translateCustomData(self.CustomData)) Log('Wrote ' + LibDir + '/CustomData') else : Error('<CustomData> contains no data!') @@ -3849,6 +4342,7 @@ class OvfEnv(object): MyDistro.setSelinuxEnforce(0) home = MyDistro.GetHome() for pkey in self.SshPublicKeys: + Log("Deploy public key:{0}".format(pkey[0])) if not os.path.isfile(pkey[0] + ".crt"): Error("PublicKey not found: " + pkey[0]) error = "Failed to deploy public key (0x09)." @@ -3865,6 +4359,7 @@ class OvfEnv(object): if path.startswith(os.path.normpath(home + "/" + self.UserName + "/")): ChangeOwner(path, self.UserName) for keyp in self.SshKeyPairs: + Log("Deploy key pair:{0}".format(keyp[0])) if not os.path.isfile(keyp[0] + ".prv"): Error("KeyPair not found: " + keyp[0]) error = "Failed to deploy key pair (0x0A)." @@ -3889,6 +4384,240 @@ class OvfEnv(object): MyDistro.restartSshService() return error + +class WALAEvent(object): + def __init__(self): + + self.providerId="" + self.eventId=1 + + self.OpcodeName="" + self.KeywordName="" + self.TaskName="" + self.TenantName="" + self.RoleName="" + self.RoleInstanceName="" + self.ContainerId="" + self.ExecutionMode="IAAS" + self.OSVersion="" + self.GAVersion="" + self.RAM=0 + self.Processors=0 + + + def ToXml(self): + strEventid=u'<Event id="{0}"/>'.format(self.eventId) + strProviderid=u'<Provider id="{0}"/>'.format(self.providerId) + strRecordFormat = u'<Param Name="{0}" Value="{1}" T="{2}" />' + strRecordNoQuoteFormat = u'<Param Name="{0}" Value={1} T="{2}" />' + strMtStr=u'mt:wstr' + strMtUInt64=u'mt:uint64' + strMtBool=u'mt:bool' + strMtFloat=u'mt:float64' + strEventsData=u"" + + for attName in self.__dict__: + if attName in ["eventId","filedCount","providerId"]: + continue + + attValue = self.__dict__[attName] + if type(attValue) is int: + strEventsData+=strRecordFormat.format(attName,attValue,strMtUInt64) + continue + if type(attValue) is str: + attValue = xml.sax.saxutils.quoteattr(attValue) + strEventsData+=strRecordNoQuoteFormat.format(attName,attValue,strMtStr) + continue + if str(type(attValue)).count("'unicode'") >0 : + attValue = xml.sax.saxutils.quoteattr(attValue) + strEventsData+=strRecordNoQuoteFormat.format(attName,attValue,strMtStr) + continue + if type(attValue) is bool: + strEventsData+=strRecordFormat.format(attName,attValue,strMtBool) + continue + if type(attValue) is float: + strEventsData+=strRecordFormat.format(attName,attValue,strMtFloat) + continue + + Log("Warning: property "+attName+":"+str(type(attValue))+":type"+str(type(attValue))+"Can't convert to events data:"+":type not supported") + + return u"<Data>{0}{1}{2}</Data>".format(strProviderid,strEventid,strEventsData) + + def Save(self): + eventfolder = LibDir+"/events" + if not os.path.exists(eventfolder): + os.mkdir(eventfolder) + os.chmod(eventfolder,0700) + if len(os.listdir(eventfolder)) > 1000: + raise Exception("WriteToFolder:Too many file under "+datafolder+" exit") + + filename = os.path.join(eventfolder,str(int(time.time()*1000000))) + with open(filename+".tmp",'wb+') as hfile: + hfile.write(self.ToXml().encode("utf-8")) + os.rename(filename+".tmp",filename+".tld") + + +class WALAEventOperation: + HeartBeat="HeartBeat" + Provision = "Provision" + Install = "Install" + UnIsntall = "UnInstall" + Disable = "Disable" + Enable = "Enable" + Download = "Download" + Upgrade = "Upgrade" + Update = "Update" + +def AddExtensionEvent(name,op,isSuccess,duration=0,version="1.0",message="",type="",isInternal=False): + event = ExtensionEvent() + event.Name=name + event.Version=version + event.IsInternal=isInternal + event.Operation=op + event.OperationSuccess=isSuccess + event.Message=message + event.Duration=duration + event.ExtensionType=type + try: + event.Save() + except: + Error("Error "+traceback.format_exc()) + + +class ExtensionEvent(WALAEvent): + def __init__(self): + + WALAEvent.__init__(self) + self.eventId=1 + self.providerId="69B669B9-4AF8-4C50-BDC4-6006FA76E975" + self.Name="" + self.Version="" + self.IsInternal=False + self.Operation="" + self.OperationSuccess=True + self.ExtensionType="" + self.Message="" + self.Duration=0 + + +class WALAEventMonitor(WALAEvent): + def __init__(self,postMethod): + WALAEvent.__init__(self) + self.post = postMethod + self.sysInfo={} + self.eventdir = LibDir+"/events" + self.issysteminfoinitilized = False + + def StartEventsLoop(self): + eventThread = threading.Thread(target = self.EventsLoop) + eventThread.setDaemon(True) + eventThread.start() + + def EventsLoop(self): + LastReportHeartBeatTime = datetime.datetime.min + try: + while(True): + if (datetime.datetime.now()-LastReportHeartBeatTime) > datetime.timedelta(hours=12): + LastReportHeartBeatTime = datetime.datetime.now() + AddExtensionEvent(op=WALAEventOperation.HeartBeat,name="WALA",isSuccess=True) + self.postNumbersInOneLoop=0 + self.CollectAndSendWALAEvents() + time.sleep(60) + except: + Error("Exception in events loop:"+traceback.format_exc()) + + def SendEvent(self,providerid,events): + dataFormat = u'<?xml version="1.0"?><TelemetryData version="1.0"><Provider id="{0}">{1}'\ + '</Provider></TelemetryData>' + data = dataFormat.format(providerid,events) + self.post("/machine/?comp=telemetrydata", data) + + def CollectAndSendWALAEvents(self): + if not os.path.exists(self.eventdir): + return + #Throtting, can't send more than 3 events in 15 seconds + eventSendNumber=0 + eventFiles = os.listdir(self.eventdir) + events = {} + for file in eventFiles: + if not file.endswith(".tld"): + continue + with open(os.path.join(self.eventdir,file),"rb") as hfile: + #if fail to open or delete the file, throw exception + xmlStr = hfile.read().decode("utf-8",'ignore') + os.remove(os.path.join(self.eventdir,file)) + params="" + eventid="" + providerid="" + #if exception happen during process an event, catch it and continue + try: + xmlStr = self.AddSystemInfo(xmlStr) + for node in xml.dom.minidom.parseString(xmlStr.encode("utf-8")).childNodes[0].childNodes: + if node.tagName == "Param": + params+=node.toxml() + if node.tagName == "Event": + eventid=node.getAttribute("id") + if node.tagName == "Provider": + providerid = node.getAttribute("id") + except: + Error(traceback.format_exc()) + continue + if len(params)==0 or len(eventid)==0 or len(providerid)==0: + Error("Empty filed in params:"+params+" event id:"+eventid+" provider id:"+providerid) + continue + + eventstr = u'<Event id="{0}"><![CDATA[{1}]]></Event>'.format(eventid,params) + if not events.get(providerid): + events[providerid]="" + if len(events[providerid]) >0 and len(events.get(providerid)+eventstr)>= 63*1024: + eventSendNumber+=1 + self.SendEvent(providerid,events.get(providerid)) + if eventSendNumber %3 ==0: + time.sleep(15) + events[providerid]="" + if len(eventstr) >= 63*1024: + Error("Signle event too large abort "+eventstr[:300]) + continue + + events[providerid]=events.get(providerid)+eventstr + + for key in events.keys(): + if len(events[key]) > 0: + eventSendNumber+=1 + self.SendEvent(key,events[key]) + if eventSendNumber%3 == 0: + time.sleep(15) + + + def AddSystemInfo(self,eventData): + if not self.issysteminfoinitilized: + self.issysteminfoinitilized=True + try: + self.sysInfo["OSVersion"]=platform.system()+":"+"-".join(DistInfo())+":"+platform.release() + self.sysInfo["GAVersion"]=GuestAgentVersion + self.sysInfo["RAM"]=MyDistro.getTotalMemory() + self.sysInfo["Processors"]=MyDistro.getProcessorCores() + sharedConfig = xml.dom.minidom.parse("/var/lib/waagent/SharedConfig.xml").childNodes[0] + hostEnvConfig= xml.dom.minidom.parse("/var/lib/waagent/HostingEnvironmentConfig.xml").childNodes[0] + gfiles = RunGetOutput("ls -t /var/lib/waagent/GoalState.*.xml")[1] + goalStateConfi = xml.dom.minidom.parse(gfiles.split("\n")[0]).childNodes[0] + self.sysInfo["TenantName"]=hostEnvConfig.getElementsByTagName("Deployment")[0].getAttribute("name") + self.sysInfo["RoleName"]=hostEnvConfig.getElementsByTagName("Role")[0].getAttribute("name") + self.sysInfo["RoleInstanceName"]=sharedConfig.getElementsByTagName("Instance")[0].getAttribute("id") + self.sysInfo["ContainerId"]=goalStateConfi.getElementsByTagName("ContainerId")[0].childNodes[0].nodeValue + except: + Error(traceback.format_exc()) + + eventObject = xml.dom.minidom.parseString(eventData.encode("utf-8")).childNodes[0] + for node in eventObject.childNodes: + if node.tagName == "Param": + name = node.getAttribute("Name") + if self.sysInfo.get(name): + node.setAttribute("Value",xml.sax.saxutils.escape(str(self.sysInfo[name]))) + + return eventObject.toxml() + + class Agent(Util): """ Primary object container for the provisioning process. @@ -4174,7 +4903,7 @@ class Agent(Util): missingDefaultRoute = True try: if DistInfo()[0] == 'FreeBSD': - routes = RunGetOutput("netstat -nr")[1] + missingDefaultRoute = True else: routes = RunGetOutput("route -n")[1] for line in routes.split('\n'): @@ -4187,9 +4916,12 @@ class Agent(Util): # network unreachable when the default gateway is not set up. ifname=MyDistro.GetInterfaceName() Log("DoDhcpWork: Missing default route - adding broadcast route for DHCP.") - Run("route add 255.255.255.255 dev " + ifname,chk_err=False) # We supress error logging on error. - if MyDistro.dhcp_client_name == 'wickedd-dhcp4': - Run("service " + MyDistro.dhcp_client_name + " stop",chk_err=False) + if DistInfo()[0] == 'FreeBSD': + Run("route add -net 255.255.255.255 -iface " + ifname,chk_err=False) + else: + Run("route add 255.255.255.255 dev " + ifname,chk_err=False) + if MyDistro.isDHCPEnabled(): + MyDistro.stopDHCP() sock.bind(("0.0.0.0", 68)) sock.sendto(sendData, ("<broadcast>", 67)) sock.settimeout(10) @@ -4215,10 +4947,13 @@ class Agent(Util): sock.close() if missingDefaultRoute: #We added this route - delete it - Run("route del 255.255.255.255 dev " + ifname,chk_err=False) # We supress error logging on error. Log("DoDhcpWork: Removing broadcast route for DHCP.") - if MyDistro.dhcp_client_name == 'wickedd-dhcp4': - Run("service " + MyDistro.dhcp_client_name + " start",chk_err=False) + if DistInfo()[0] == 'FreeBSD': + Run("route del -net 255.255.255.255 -iface " + ifname,chk_err=False) + else: + Run("route del 255.255.255.255 dev " + ifname,chk_err=False) # We supress error logging on error. + if MyDistro.isDHCPEnabled(): + MyDistro.startDHCP() return None def UpdateAndPublishHostName(self, name): @@ -4277,7 +5012,7 @@ class Agent(Util): + "</ContainerId><RoleInstanceList><Role><InstanceId>" + self.GoalState.RoleInstanceId + "</InstanceId><Health><State>Ready</State></Health></Role></RoleInstanceList></Container></Health>") - a = self.HttpPost("/machine?comp=health", healthReport) + a = self.HttpPostWithHeaders("/machine?comp=health", healthReport) if a != None: return a.getheader("x-ms-latest-goal-state-incarnation-number") return None @@ -4296,7 +5031,7 @@ class Agent(Util): + "</InstanceId><Health><State>NotReady</State>" + "<Details><SubStatus>" + status + "</SubStatus><Description>" + desc + "</Description></Details>" + "</Health></Role></RoleInstanceList></Container></Health>") - a = self.HttpPost("/machine?comp=health", healthReport) + a = self.HttpPostWithHeaders("/machine?comp=health", healthReport) if a != None: return a.getheader("x-ms-latest-goal-state-incarnation-number") return None @@ -4311,7 +5046,8 @@ class Agent(Util): + "<Id>" + self.GoalState.RoleInstanceId + "</Id>" + "<Properties><Property name=\"CertificateThumbprint\" value=\"" + thumbprint + "\" /></Properties>" + "</RoleInstance></RoleInstances></Container></RoleProperties>") - a = self.HttpPost("/machine?comp=roleProperties", roleProperties) + a = self.HttpPostWithHeaders("/machine?comp=roleProperties", + roleProperties) Log("Posted Role Properties. CertificateThumbprint=" + thumbprint) return a @@ -4516,6 +5252,7 @@ class Agent(Util): if error : Error ("Provisioning image FAILED " + error) return ("Provisioning image FAILED " + error) + Log("Ovf XML process finished") # This is done here because regenerated SSH host key pairs may be potentially overwritten when processing the ovfxml fingerprint = RunGetOutput("ssh-keygen -lf /etc/ssh/ssh_host_" + type + "_key.pub")[1].rstrip().split()[1].replace(':','') self.ReportRoleProperties(fingerprint) @@ -4594,7 +5331,8 @@ class Agent(Util): Openssl = "openssl" self.TransportCert = self.GenerateTransportCert() - + + eventMonitor = None incarnation = None # goalStateIncarnationFromHealthReport currentPort = None # loadBalancerProbePort goalState = None # self.GoalState, instance of GoalState @@ -4620,7 +5358,16 @@ class Agent(Util): if provisionError == None : provisioned = True SetFileContents(LibDir + "/provisioned", "") - + lastCtime = "NOTFIND" + try: + walaConfigFile = MyDistro.getConfigurationPath() + lastCtime = time.ctime(os.path.getctime(walaConfigFile)) + except: + pass + #Get Ctime of wala config, can help identify the base image of this VM + AddExtensionEvent(name="WALA",op=WALAEventOperation.Provision,isSuccess=True, + message="WALA Config Ctime:"+lastCtime) + # # only one port supported # restart server if new port is different than old port @@ -4666,8 +5413,13 @@ class Agent(Util): # report the status/heartbeat results of extension processing if goalState.ExtensionsConfig != None : goalState.ExtensionsConfig.ReportHandlerStatus() + + if not eventMonitor: + eventMonitor = WALAEventMonitor(self.HttpPostWithHeaders) + eventMonitor.StartEventsLoop() time.sleep(25 - sleepToReduceAccessDenied) + WaagentLogrotate = """\ /var/log/waagent.log { @@ -4918,6 +5670,10 @@ def Deprovision(force, deluser): Set hostname to 'localhost.localdomain'. Delete cached system configuration files in /var/lib and /var/lib/waagent. """ + + #Append blank line at the end of file, so the ctime of this file is changed every time + Run("echo ''>>"+ MyDistro.getConfigurationPath()) + SwitchCwd() ovfxml = GetFileContents(LibDir+"/ovf-env.xml") ovfobj = None @@ -4939,8 +5695,6 @@ def Deprovision(force, deluser): return 1 MyDistro.stopAgentService() - if deluser == True: - MyDistro.DeleteAccount(ovfobj.UserName) # Remove SSH host keys regenerateKeys = Config.get("Provisioning.RegenerateSshHostKeyPair") @@ -4954,6 +5708,8 @@ def Deprovision(force, deluser): MyDistro.publishHostname('localhost.localdomain') MyDistro.deprovisionDeleteFiles() + if deluser == True: + MyDistro.DeleteAccount(ovfobj.UserName) return 0 def SwitchCwd(): @@ -4971,6 +5727,8 @@ def Usage(): print("usage: " + sys.argv[0] + " [-verbose] [-force] [-help|-install|-uninstall|-deprovision[+user]|-version|-serialconsole|-daemon]") return 0 + + def main(): """ Instantiate MyDistro, exit if distro class is not defined. @@ -4998,6 +5756,9 @@ def main(): for a in sys.argv[1:]: if re.match("^([-/]*)(help|usage|\?)", a): sys.exit(Usage()) + elif re.match("^([-/]*)version", a): + print(GuestAgentVersion + " running on " + LinuxDistro) + sys.exit(0) elif re.match("^([-/]*)verbose", a): myLogger.verbose = True elif re.match("^([-/]*)force", a): @@ -5023,9 +5784,6 @@ def main(): sys.exit(Deprovision(force, False)) elif re.match("^([-/]*)daemon", a): daemon = True - elif re.match("^([-/]*)version", a): - print(GuestAgentVersion + " running on " + LinuxDistro) - sys.exit(0) elif re.match("^([-/]*)serialconsole", a): AppendToLinuxKernelCmdline("console=ttyS0 earlyprintk=ttyS0") Log("Configured kernel to use ttyS0 as the boot console.") |