summaryrefslogtreecommitdiff
path: root/waagent
diff options
context:
space:
mode:
Diffstat (limited to 'waagent')
-rw-r--r--waagent380
1 files changed, 259 insertions, 121 deletions
diff --git a/waagent b/waagent
index ea6a77a..fcaa71e 100644
--- a/waagent
+++ b/waagent
@@ -78,7 +78,7 @@ if not hasattr(subprocess,'check_output'):
GuestAgentName = "WALinuxAgent"
GuestAgentLongName = "Windows Azure Linux Agent"
-GuestAgentVersion = "WALinuxAgent-2.0.5"
+GuestAgentVersion = "WALinuxAgent-2.0.8"
ProtocolVersion = "2012-11-30" #WARNING this value is used to confirm the correct fabric protocol.
Config = None
@@ -100,6 +100,7 @@ global provisioned
provisioned=False
global provisionError
provisionError=None
+HandlerStatusToAggStatus = {"installed":"Installing", "enabled":"Ready", "unintalled":"NotReady", "disabled":"NotReady"}
WaagentConf = """\
#
@@ -130,6 +131,16 @@ Logs.Verbose=n # Enable verbose logs
OS.RootDeviceScsiTimeout=300 # Root device timeout in seconds.
OS.OpensslPath=None # If "None", the system default version is used.
"""
+README_FILENAME="DATALOSS_WARNING_README.txt"
+README_FILECONTENT="""\
+WARNING: THIS IS A TEMPORARY DISK.
+
+Any data stored on this drive is SUBJECT TO LOSS and THERE IS NO WAY TO RECOVER IT.
+
+Please do not use this disk for storing any personal or application data.
+
+For additional details to please refer to the MSDN documentation at : http://msdn.microsoft.com/en-us/library/windowsazure/jj672979.aspx
+"""
############################################################
# BEGIN DISTRO CLASS DEFS
@@ -282,7 +293,7 @@ class AbstractDistro(object):
sshRestartCmd = self.service_cmd + " " + self.ssh_service_name + " " + self.ssh_service_restart_option
retcode = Run(sshRestartCmd)
if retcode > 0:
- Error("Failed to restart SSH service with return code:" + retcode)
+ Error("Failed to restart SSH service with return code:" + str(retcode))
return retcode
def sshDeployPublicKey(self,fprint,path):
@@ -423,6 +434,9 @@ class AbstractDistro(object):
def GetInterfaceName(self):
return GetFirstActiveNetworkInterfaceNonLoopback()[0]
+ def RestartInterface(self, iface):
+ Run("ifdown " + iface + " && ifup " + iface)
+
def CreateAccount(self,user, password, expiration, thumbprint):
return CreateAccount(user, password, expiration, thumbprint)
@@ -444,26 +458,33 @@ class AbstractDistro(object):
Error("ActivateResourceDisk: Unable to detect disk topology.")
return
device = "/dev/" + device
- for entry in RunGetOutput("mount")[1].split():
- if entry.startswith(device + "1"):
- Log("ActivateResourceDisk: " + device + "1 is already mounted.")
- DiskActivated = True
+
+ mountlist = RunGetOutput("mount")[1]
+ mountpoint = GetMountPoint(mountlist, device)
+
+ if(mountpoint):
+ Log("ActivateResourceDisk: " + device + "1 is already mounted.")
+ else:
+ mountpoint = Config.get("ResourceDisk.MountPoint")
+ if mountpoint == None:
+ mountpoint = "/mnt/resource"
+ CreateDir(mountpoint, "root", 0755)
+ fs = Config.get("ResourceDisk.Filesystem")
+ if fs == None:
+ fs = "ext3"
+ if RunGetOutput("sfdisk -q -c " + device + " 1")[1].rstrip() == "7" and fs != "ntfs":
+ Run("sfdisk -c " + device + " 1 83")
+ Run("mkfs." + fs + " " + device + "1")
+ if Run("mount " + device + "1 " + mountpoint):
+ Error("ActivateResourceDisk: Failed to mount resource disk (" + device + "1).")
return
- mountpoint = Config.get("ResourceDisk.MountPoint")
- if mountpoint == None:
- mountpoint = "/mnt/resource"
- CreateDir(mountpoint, "root", 0755)
- fs = Config.get("ResourceDisk.Filesystem")
- if fs == None:
- fs = "ext3"
- if RunGetOutput("sfdisk -q -c " + device + " 1")[1].rstrip() == "7" and fs != "ntfs":
- Run("sfdisk -c " + device + " 1 83")
- Run("mkfs." + fs + " " + device + "1")
- if Run("mount " + device + "1 " + mountpoint):
- Error("ActivateResourceDisk: Failed to mount resource disk (" + device + "1).")
- return
- Log("Resource disk (" + device + "1) is mounted at " + mountpoint + " with fstype " + fs)
+ Log("Resource disk (" + device + "1) is mounted at " + mountpoint + " with fstype " + fs)
+
+ #Create README file under the root of resource disk
+ SetFileContents(os.path.join(mountpoint,README_FILENAME), README_FILECONTENT)
DiskActivated = True
+
+ #Create swap space
swap = Config.get("ResourceDisk.EnableSwap")
if swap == None or swap.lower().startswith("n"):
return
@@ -524,6 +545,90 @@ class AbstractDistro(object):
SetFileContents(filePath,timeout)
Log("SetBlockDeviceTimeout: Update the device " + device + " with timeout " + timeout)
+ def waitForSshHostKey(self, path):
+ """
+ Provide a dummy waiting, since by default, ssh host key is created by waagent and the key
+ should already been created.
+ """
+ if(os.path.isfile(path)):
+ return True
+ else:
+ Error("Can't find host key: {0}".format(path))
+ return False
+
+############################################################
+# GentooDistro
+############################################################
+gentoo_init_file = """\
+#!/sbin/runscript
+
+command=/usr/sbin/waagent
+pidfile=/var/run/waagent.pid
+command_args=-daemon
+command_background=true
+name="Windows Azure Linux Agent"
+
+depend()
+{
+ needs localmount
+ use logger network
+ after bootmisc modules
+}
+
+"""
+class gentooDistro(AbstractDistro):
+ """
+ Gentoo distro concrete class
+ """
+
+ def __init__(self): #
+ super(gentooDistro,self).__init__()
+ self.service_cmd='/sbin/service'
+ self.ssh_service_name='sshd'
+ self.hostname_file_path='/etc/conf.d/hostname'
+ self.dhcp_client_name='dhcpcd'
+ self.shadow_file_mode=0640
+ self.init_file=gentoo_init_file
+
+ def publishHostname(self,name):
+ try:
+ if (os.path.isfile(self.hostname_file_path)):
+ r=ReplaceFileContentsAtomic(self.hostname_file_path, "hostname=\"" + name + "\"\n"
+ + "\n".join(filter(lambda a: not a.startswith("hostname="), GetFileContents(self.hostname_file_path).split("\n"))))
+ except:
+ return 1
+ return r
+
+ def installAgentServiceScriptFiles(self):
+ SetFileContents(self.init_script_file, self.init_file)
+ os.chmod(self.init_script_file, 0755)
+
+ def registerAgentService(self):
+ self.installAgentServiceScriptFiles()
+ return Run('rc-update add ' + self.agent_service_name + ' default')
+
+ def uninstallAgentService(self):
+ return Run('rc-update del ' + self.agent_service_name + ' default')
+
+ def unregisterAgentService(self):
+ self.stopAgentService()
+ return self.uninstallAgentService()
+
+ def checkPackageInstalled(self,p):
+ if Run('eix -I ^' + p + '$',chk_err=False):
+ return 0
+ else:
+ return 1
+
+ def checkPackageUpdateable(self,p):
+ if Run('eix -u ^' + p + '$',chk_err=False):
+ return 0
+ else:
+ return 1
+
+ def RestartInterface(self, iface):
+ Run("/etc/init.d/net." + iface + " restart")
+
############################################################
# SuSEDistro
############################################################
@@ -1082,6 +1187,18 @@ class UbuntuDistro(debianDistro):
self.dhcp_client_name='dhclient'
return self.dhcp_client_name
+ def waitForSshHostKey(self, path):
+ """
+ Wait until the ssh host key is generated by cloud init.
+ """
+ for retry in range(0, 10):
+ if(os.path.isfile(path)):
+ return True
+ time.sleep(1)
+ Error("Can't find host key: {0}".format(path))
+ return False
+
+
############################################################
# LinuxMintDistro
############################################################
@@ -1305,6 +1422,9 @@ class FreeBSDDistro(AbstractDistro):
iface,inet,mac=self.GetFreeBSDEthernetInfo()
return iface
+ def RestartInterface(self, iface):
+ Run("service netif restart")
+
def GetIpv4Address(self):
"""
Return the ip of the
@@ -1538,8 +1658,8 @@ class FreeBSDDistro(AbstractDistro):
ovfxml = ovfxml[3:] # BOM is not stripped. First three bytes are > 128 and not unicode chars so we ignore them.
ovfxml = ovfxml.strip(chr(0x00))
ovfxml = "".join(filter(lambda x: ord(x)<128, ovfxml))
- ovfxml = re.sub('>.*\Z', '', ovfxml)
- ovfxml += '>'
+ ovfxml = re.sub(r'</Environment>.*\Z','',ovfxml,0,re.DOTALL)
+ ovfxml += '</Environment>'
SetFileContents(location+"/ovf-env.xml", ovfxml)
return retcode,out
@@ -2835,7 +2955,7 @@ class ExtensionsConfig(object):
Log("No RuntimeSettings for " + name + " V " + version)
SimpleLog(p.plugin_log,"No RuntimeSettings for " + name + " V " + version)
- SetFileContents(root +"/config/" + incarnation +".settings", config )
+ SetFileContents(root +"/config/" + seqNo +".settings", config )
#create HandlerEnvironment.json
handler_env='[{ "name": "'+name+'", "seqNo": "'+seqNo+'", "version": 1.0, "handlerEnvironment": { "logFolder": "'+os.path.dirname(p.plugin_log)+'", "configFolder": "' + root + '/config", "statusFolder": "' + root + '/status", "heartbeatFile": "'+ root + '/heartbeat.log"}}]'
SetFileContents(root+'/HandlerEnvironment.json',handler_env)
@@ -2928,7 +3048,7 @@ class ExtensionsConfig(object):
Error("No RuntimeSettings for " + name + " V " + version)
SimpleLog(p.plugin_log,"No RuntimeSettings for " + name + " V " + version)
- SetFileContents(root +"/config/" + incarnation +".settings", config )
+ SetFileContents(root +"/config/" + seqNo +".settings", config )
# state is still enable
if (self.GetHandlerState(handler) == 'NotInstalled'): # run install first if true
@@ -3013,7 +3133,7 @@ class ExtensionsConfig(object):
# launch
pid=None
try:
- child = subprocess.Popen(dirpath+'/'+cmd+arg,shell=True,cwd=dirpath)
+ child = subprocess.Popen(dirpath+'/'+cmd+arg,shell=True,cwd=dirpath,stdout=subprocess.PIPE)
except Exception as e:
Error('Exception launching ' + cmd + str(e))
SimpleLog(plugin_log,'Exception launching ' + cmd + str(e))
@@ -3038,8 +3158,8 @@ class ExtensionsConfig(object):
time.sleep(5)
retry-=1
if retry==0:
- Error('Process exceeded timeout of ' + timeout + ' seconds. Terminating process ' + str(pid))
- SimpleLog(plugin_log,'Process exceeded timeout of ' + timeout + ' seconds. Terminating process ' + str(pid))
+ Error('Process exceeded timeout of ' + str(timeout) + ' seconds. Terminating process ' + str(pid))
+ SimpleLog(plugin_log,'Process exceeded timeout of ' + str(timeout) + ' seconds. Terminating process ' + str(pid))
os.kill(pid,9)
return None
@@ -3079,7 +3199,6 @@ class ExtensionsConfig(object):
return None
status=''
statuses=''
- sent_suffix = '_sent'
for p in self.Plugins:
if p.getAttribute("state") == 'uninstall' or p.getAttribute("restricted") == 'true' :
continue
@@ -3088,27 +3207,14 @@ class ExtensionsConfig(object):
if p.getAttribute("isJson") != 'true':
LogIfVerbose("Plugin " + name+" version: " +version+" is not a JSON Extension. Skipping.")
continue
- status_file=LibDir+'/'+name+'-'+version+'/status/'+incarnation+'.status'
- if os.path.exists(status_file) !=True and os.path.exists(status_file+sent_suffix) != True:
- if p.getAttribute("state") == 'disabled' :
- LogIfVerbose(name+'-'+version+' is disabled. No status to report.')
- continue
- Error("Unable to locate " + status_file)
- Error('Status report '+status_file+' Not sent!')
- continue
- elif os.path.exists(status_file) !=True and os.path.exists(status_file+sent_suffix) == True:
- status_file = status_file+sent_suffix
+ reportHeartbeat = False
+ if len(p.getAttribute("manifestdata"))<1:
+ Error("Failed to get manifestdata.")
+ else:
+ reportHeartbeat = json.loads(p.getAttribute("manifestdata"))[0]['handlerManifest']['reportHeartbeat']
if len(statuses)>0:
statuses+=','
- statuses+=GetFileContents(status_file)
- if status_file.find(sent_suffix) < 0:
- if os.path.exists(status_file + sent_suffix):
- os.remove(status_file + sent_suffix)
- os.rename(status_file,status_file+sent_suffix)
-
- if len(statuses)<1:
- LogIfVerbose('No Handler status to report')
- return None
+ statuses+=self.GenerateAggStatus(name, version, reportHeartbeat)
tstamp=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
#header
#agent state
@@ -3131,72 +3237,85 @@ class ExtensionsConfig(object):
return None
self.Util.Endpoint=uri.split('/')[2]
self.Util.HttpPutBlockBlob(uri, status)
- Log('Status report '+status+' sent to ' + uri)
+ LogIfVerbose('Status report '+status+' sent to ' + uri)
return True
-
- def CheckHeartbeat(self):
- try:
- incarnation=self.Extensions[0].getAttribute("goalStateIncarnation")
- uri=GetNodeTextData(self.Extensions[0].getElementsByTagName("StatusUploadBlob")[0])
- except:
- Error('Error parsing ExtensionsConfig. Unable to check hearbeat.')
- return None
- for p in self.Plugins:
- if p.getAttribute("state") == 'disabled' or p.getAttribute("state") == 'uninstall' or p.getAttribute("restricted") == 'true' :
- continue
- version=p.getAttribute("version")
- name=p.getAttribute("name")
- if p.getAttribute("isJson") != 'true':
- Error("Plugin " + name+" version: " +version+" is not a JSON Extension. Skipping.")
- continue
- try:
- if len(p.getAttribute("manifestdata"))<1 or json.loads(p.getAttribute("manifestdata"))[0]['handlerManifest']['reportHeartbeat']!=True :
- Error("JSON error, unable to process manifestdata")
- continue
- except:
- continue
-
- heartbeat_file=LibDir+'/'+name+'-'+version+'/heartbeat.log'
- status_file=LibDir+'/'+name+'-'+version+'/status/'+incarnation+'.status'
- if not os.path.exists(heartbeat_file):
- Error('Missing '+ heartbeat_file)
- continue
- else:
- heartbeat=GetFileContents(heartbeat_file)
- try:
- hb=json.loads(heartbeat)
- except:
- Error("JSON error, unable to process " + heartbeat_file)
- try:
- d=int(time.time()-os.stat(heartbeat_file).st_mtime)
- except:
- Error("Unable to stat " + heartbeat_file)
- continue
-
- if d >= 700: # stop sending heartbeats
- return 'NotReady'
- if d < 120: # within 2 mins considered active
- return 'Ready'
- if d < 600: # less than 10 mins unknown
- state='Unknown'
- else: # more than 10 mins with no update considered notready
- state='NotReady'
+
+ def GetCurrentSequenceNumber(self, plugin_base_dir):
+ """
+ Get the settings file with biggest file number in config folder
+ """
+ config_dir = os.path.join(plugin_base_dir, 'config')
+ seq_no = 0
+ for subdir, dirs, files in os.walk(config_dir):
+ for file in files:
try:
- stat_rept='{"handlerName":"' + name + '","handlerVersion":"'+version+ '","status":"' +hb[0]['heartbeat']['status'] + '","code":' + hb[0]['heartbeat']['code'] + ',"formattedMessage":{"lang":"en-US","message":"' + hb[0]['heartbeat']['Message'] + '"}}'
- cur_file=status_file+'_current'
- with open(cur_file,'w+') as f:
- f.write(stat_rept)
- # if inc.status exists, rename the inc.status to inc.status_sent
- if os.path.exists(status_file) == True:
- os.rename(status_file,status_file+'_sent')
- # rename inc.status_current to inc.status
- os.rename(cur_file,status_file)
- # remove inc.status_sent
- if os.path.exists(status_file+'_sent') == True:
- os.unlink(status_file+'_sent')
- except:
- Error("Unable to create " + status_file)
+ cur_seq_no = int(os.path.basename(file).split('.')[0])
+ if cur_seq_no > seq_no:
+ seq_no = cur_seq_no
+ except ValueError:
continue
+ return str(seq_no)
+
+
+ def GenerateAggStatus(self, name, version, reportHeartbeat = False):
+ """
+ Generate the status which Azure can understand by the status and heartbeat reported by extension
+ """
+ plugin_base_dir = LibDir+'/'+name+'-'+version+'/'
+ current_seq_no = self.GetCurrentSequenceNumber(plugin_base_dir)
+ status_file=os.path.join(plugin_base_dir, 'status/', current_seq_no +'.status')
+ heartbeat_file = os.path.join(plugin_base_dir, 'heartbeat.log')
+
+ handler_state_file = os.path.join(plugin_base_dir, 'config', 'HandlerState')
+ agg_state = 'NotReady'
+ handler_state = None
+ status_obj = None
+ status_code = None
+ formatted_message = None
+ localized_message = None
+
+ if os.path.exists(handler_state_file):
+ handler_state = GetFileContents(handler_state_file).lower()
+ if HandlerStatusToAggStatus.has_key(handler_state):
+ agg_state = HandlerStatusToAggStatus[handler_state]
+ if reportHeartbeat:
+ if os.path.exists(heartbeat_file):
+ d=int(time.time()-os.stat(heartbeat_file).st_mtime)
+ if d > 600 : # not updated for more than 10 min
+ agg_state = 'Unresponsive'
+ else:
+ try:
+ heartbeat = json.loads(GetFileContents(heartbeat_file))[0]["heartbeat"]
+ agg_state = heartbeat.get("status")
+ status_code = heartbeat.get("code")
+ formatted_message = heartbeat.get("formattedMessage")
+ localized_message = heartbeat.get("message")
+ except:
+ Error("Incorrect heartbeat file. Ignore it. ")
+ else:
+ agg_state = 'Unresponsive'
+ #get status file reported by extension
+ if os.path.exists(status_file):
+ # raw status generated by extension is an array, get the first item and remove the unnecessary element
+ try:
+ status_obj = json.loads(GetFileContents(status_file))[0]
+ del status_obj["version"]
+ except:
+ Error("Incorrect status file. Will NOT settingsStatus in settings. ")
+ agg_status_obj = {"handlerName": name, "handlerVersion": version, "status": agg_state, "runtimeSettingsStatus" :
+ {"sequenceNumber": current_seq_no}}
+ if status_obj:
+ agg_status_obj["runtimeSettingsStatus"]["settingsStatus"] = status_obj
+ if status_code != None:
+ agg_status_obj["code"] = status_code
+ if formatted_message:
+ agg_status_obj["formattedMessage"] = formatted_message
+ if localized_message:
+ agg_status_obj["message"] = localized_message
+ agg_status_string = json.dumps(agg_status_obj)
+ LogIfVerbose("Handler Aggregated Status:" + agg_status_string)
+ return agg_status_string
+
def SetHandlerState(self, handler, state=''):
zip_dir=LibDir+"/" + handler
@@ -3992,7 +4111,7 @@ class Agent(Util):
# http://msdn.microsoft.com/en-us/library/cc227282%28PROT.10%29.aspx
LogIfVerbose("Routes at offset:" + hex(i) + " with length:" + hex(length))
if length < 5:
- Error("Data too small for option " + option)
+ Error("Data too small for option " + str(option))
j = i + 2
while j < (i + length + 2):
maskLengthBits = Ord(receiveBuffer[j])
@@ -4023,7 +4142,7 @@ class Agent(Util):
name = "Windows Azure wire protocol endpoint"
LogIfVerbose(name + ": " + IpAddress + " at " + hex(i))
else:
- Error("HandleDhcpResponse: Data too small for option " + option)
+ Error("HandleDhcpResponse: Data too small for option " + str(option))
else:
LogIfVerbose("Skipping DHCP option " + hex(option) + " at " + hex(i) + " with length " + hex(length))
i += length + 2
@@ -4036,14 +4155,15 @@ class Agent(Util):
"""
ShortSleep = False # Sleep 1 second before retrying DHCP queries.
ifname=None
- Run("iptables -D INPUT -p udp --dport 68 -j ACCEPT",chk_err=False) # We supress error logging on error.
- Run("iptables -I INPUT -p udp --dport 68 -j ACCEPT",chk_err=False) # We supress error logging on error.
sleepDurations = [0, 10, 30, 60, 60]
maxRetry = len(sleepDurations)
lastTry = (maxRetry - 1)
for retry in range(0, maxRetry):
try:
+ #Open DHCP port if iptables is enabled.
+ Run("iptables -D INPUT -p udp --dport 68 -j ACCEPT",chk_err=False) # We supress error logging on error.
+ Run("iptables -I INPUT -p udp --dport 68 -j ACCEPT",chk_err=False) # We supress error logging on error.
strRetry = str(retry)
prefix = "DoDhcpWork: try=" + strRetry
LogIfVerbose(prefix)
@@ -4109,10 +4229,7 @@ class Agent(Util):
Log("Setting host name: " + name)
MyDistro.publishHostname(name)
ethernetInterface = MyDistro.GetInterfaceName()
- if DistInfo()[0] == 'FreeBSD':
- Run("service netif restart")
- else:
- Run("ifdown " + ethernetInterface + " && ifup " + ethernetInterface)
+ MyDistro.RestartInterface(ethernetInterface)
self.RestoreRoutes()
def RestoreRoutes(self):
@@ -4524,8 +4641,11 @@ class Agent(Util):
type = Config.get("Provisioning.SshHostKeyPairType")
if type == None:
type = "rsa"
- fingerprint = RunGetOutput("ssh-keygen -lf /etc/ssh/ssh_host_" + type + "_key.pub")[1].rstrip().split()[1].replace(':','')
- self.ReportRoleProperties(fingerprint)
+
+ host_key_path = "/etc/ssh/ssh_host_" + type + "_key.pub"
+ if(MyDistro.waitForSshHostKey(host_key_path)):
+ fingerprint = RunGetOutput("ssh-keygen -lf /etc/ssh/ssh_host_" + type + "_key.pub")[1].rstrip().split()[1].replace(':','')
+ self.ReportRoleProperties(fingerprint)
if program != None and DiskActivated == True:
try:
@@ -4547,7 +4667,6 @@ class Agent(Util):
# report the status/heartbeat results of extension processing
if goalState.ExtensionsConfig != None :
goalState.ExtensionsConfig.ReportHandlerStatus()
- goalState.ExtensionsConfig.CheckHeartbeat()
time.sleep(25 - sleepToReduceAccessDenied)
@@ -4560,6 +4679,25 @@ WaagentLogrotate = """\
}
"""
+def GetMountPoint(mountlist, device):
+ """
+ Example of mountlist:
+ /dev/sda1 on / type ext4 (rw)
+ proc on /proc type proc (rw)
+ sysfs on /sys type sysfs (rw)
+ devpts on /dev/pts type devpts (rw,gid=5,mode=620)
+ tmpfs on /dev/shm type tmpfs (rw,rootcontext="system_u:object_r:tmpfs_t:s0")
+ none on /proc/sys/fs/binfmt_misc type binfmt_misc (rw)
+ /dev/sdb1 on /mnt/resource type ext4 (rw)
+ """
+ if (mountlist and device):
+ for entry in mountlist.split('\n'):
+ if(re.search(device, entry)):
+ tokens = entry.split()
+ #Return the 3rd column of this line
+ return tokens[2] if len(tokens) > 2 else None
+ return None
+
def FindInLinuxKernelCmdline(option):
"""
Return match object if 'option' is present in the kernel boot options