#!/usr/bin/python
import os
import sys
import platform
import socket
import fcntl
import struct
import array
import re
import tempfile
import unittest
import random
import string
import threading
from time import ctime, sleep
import imp
# waagent has no '.py' therefore create waagent module import manually.
waagent=imp.load_source('waagent','../waagent')
TestingVersion = "$CommitBranch:future$|$LastCommitDate:2013-04-16 15:52:17 -0700$|$LastCommitHash:7ad7c643b2adbac40b1ea4a5b6eb19f0fe971623$"
class WaagentTestCases(unittest.TestCase):
"""
Test cases for waagent
"""
def setUp(self):
"""
Check for root permissions.
Check Distro is supported.
Create a waagent.conf file.
"""
waagent.LoggerInit('/var/log/waagent.log','/dev/console')
if not self.AmIRoot():
raise Exception('I need to run as root')
DistroName=platform.dist()[0]
self.failUnless(hasattr(waagent,DistroName+'Distro') == True,DistroName+' is not a supported linux distribution.')
waagent.MyDistro=getattr(waagent,DistroName+'Distro')()
# set up /etc/waagent.conf
with open('/etc/waagent.conf','wb') as f:
f.write(waagent.WaagentConf)
f.close()
def tearDown(self):
"""
Remove test resources.
This is a stub
"""
pass
def AmIRoot(self):
"""
Check that our uid is root.
"""
return 'root' in waagent.RunGetOutput('id')[1]
def writetothelog(self,id):
"""
Convienence function.
Used by testTwoLogWritingThreads()
Write 'start', sleep for id seconds and
write 'end' to the logfile.
"""
waagent.Log(str(id)+' start ')
sleep(id)
waagent.Log(str(id)+' end ')
def noop(self,arg2):
"""
Set a method to noop() to prevent its operation.
"""
pass
##############TESTCASES##########################
###############Astract Distro - Concrete Distro Tests##############
def testMyDistroMemberVariables(self):
"""
Ensure that required Distro properties are not None.
"""
assert waagent.MyDistro.agent_service_name is not None , 'MyDistro.agent_service_name must not be None'
assert waagent.MyDistro.selinux is not None , 'MyDistro.selinux must not be None'
assert waagent.MyDistro.ssh_service_name is not None , 'MyDistro.ssh_service_name must not be None'
assert waagent.MyDistro.ssh_config_file is not None , 'MyDistro.ssh_config_file must not be None'
assert waagent.MyDistro.hostname_file_path is not None , 'MyDistro.hostname_file_path must not be None'
assert waagent.MyDistro.dhcp_client_name is not None , 'MyDistro.dhcp_client_name must not be None'
assert waagent.MyDistro.requiredDeps is not None , 'MyDistro.requiredDeps must not be None'
assert waagent.MyDistro.init_script_file is not None , 'MyDistro.init_script_file must not be None'
assert waagent.MyDistro.agent_package_name is not None , 'MyDistro.agent_package_name must not be None'
assert waagent.MyDistro.fileBlackList is not None , 'MyDistro.fileBlackList must not be None'
assert waagent.MyDistro.agent_files_to_uninstall is not None , 'MyDistro.agent_files_to_uninstall must not be None'
assert waagent.MyDistro.grubKernelBootOptionsFile is not None , 'MyDistro.grubKernelBootOptionsFile must not be None'
assert waagent.MyDistro.grubKernelBootOptionsLine is not None , 'MyDistro.grubKernelBootOptionsLine must not be None'
def testMyDistro_restartSshService(self):
"""
Test MyDistro.restartSshService()
"""
cmd = 'service '+ waagent.MyDistro.ssh_service_name + ' status'
sshpid=string.rsplit(waagent.RunGetOutput(cmd)[1],' ',1)
waagent.MyDistro.restartSshService()
assert sshpid is not string.rsplit(waagent.RunGetOutput(cmd)[1],' ',1),'ssh server pid is unchanged.'
# def testMyDistro_checkPackageInstalled(self):
# """MyDistro can check if WaLinuxAgent package is installed"""
# assert waagent.MyDistro.checkPackageInstalled(waagent.MyDistro.agent_package_name) != 0, waagent.MyDistro.agent_package_name+' is Not Installed.'
# def testMyDistro_checkPackageUpdateable(self):
# """MyDistro can check if WaLinuxAgent package is updateable to new version."""
# assert waagent.MyDistro.checkPackageUpdateable(waagent.MyDistro.agent_package_name) == 0 , waagent.MyDistro.agent_package_name+' is not updateable.'
def testMyDistro_isSelinuxSystem(self):
"""
MyDistro can perform Selinux operations.
Test MyDistro.isSelinuxSystem, if true then also test:
MyDistro.isSelinuxRunning
MyDistro.setSelinuxEnforce
MyDistro.setSelinuxContext
"""
selinux=waagent.MyDistro.isSelinuxSystem()
if selinux:
assert waagent.MyDistro.isSelinuxRunning(), 'Selinux not running.'
assert waagent.MyDistro.setSelinuxEnforce(0), 'Unable to call setenforce(0).'
assert waagent.MyDistro.setSelinuxContext('./test_waagent.py','unconfined_u:object_r:ssh_home_t:s0'), 'Unable to set Selinux context.'
assert waagent.MyDistro.setSelinuxEnforce(0), 'Unable to call setenforce(1).'
else:
print 'Selinux not installed. - skipping Selinux tests'
def testMyDistro_load_unload_ata_piix(self):
"""
Attempt to insert and remove ata_piix.ko
by calling MyDistro.load_ata_piix
and MyDistro.unload_ata_piix.
"""
assert waagent.MyDistro.load_ata_piix() == 0, 'Unable to load ata_piix.ko.'
assert waagent.MyDistro.unload_ata_piix() == 0, 'Unable to unload ata_piix.ko.'
def testMyDistro_publishHostname(self):
"""
Test MyDistro.publishHostname
on success, the distro dependent config
contains the hostname, but currently
this test suceeds if the config files were written
without error.
"""
assert waagent.MyDistro.publishHostname('LENG') == 0, 'Error setting hostname to LENG.'
# def testMyDistro_registerAgentService(self):
# assert waagent.MyDistro.registerAgentService() == 0, 'Unable to register agent as service.'
def testMyDistro_setHostname(self):
"""
Test MyDistro.setHostname.
Successfull if hostname is changed.
Reset hostname when finished.
"""
code,oldname = waagent.RunGetOutput('hostname')
waagent.MyDistro.setHostname('HOSTNAMETEST')
code,newname = waagent.RunGetOutput('hostname')
assert 'HOSTNAMETEST' == newname.strip(), 'Unable to set hostname.'
waagent.MyDistro.setHostname(oldname)
def testMyDistro_checkDependencies(self):
"""
Test MyDistro.checkDependencies succeeds
"""
assert waagent.MyDistro.checkDependencies() == 0 , 'Dependency Check failed.'
def testMyDistro_startAgentService(self):
"""
Test MyDistro.startAgentService.
"""
assert waagent.MyDistro.startAgentService() == 0, 'Unable to start ' + waagent.MyDistro.agent_service_name
def testMyDistro_stopAgentService(self):
"""
Test MyDistro.stopAgentService.
"""
assert waagent.MyDistro.stopAgentService() == 0, 'Unable to stop ' + waagent.MyDistro.agent_service_name
def testMyDistro_deleteRootPassword(self):
"""
Test MyDistro.deleteRootPassword.
Restore the shadow file to previous state when finished.
"""
#copy the shadow
waagent.Run('cp /etc/shadow /etc/shadow.keep')
waagent.MyDistro.deleteRootPassword()
assert waagent.Run('grep LOCK /etc/shadow') == 0 , 'Error removing root password.'
# put shadow back
waagent.Run('mv /etc/shadow.keep /etc/shadow')
def testFindIn_AppendTo_RemoveFrom_LinuxKernelCmdline(self):
"""
Test LinuxKernelCmdline operations.
Search for 'splish=splash' in the kernel boot options, expect fail.
Add 'splish=splash'. Search for splish=splash expect success.
Remove 'splish=splash', confirm splish=splash absent
"""
m=waagent.FindInLinuxKernelCmdline('splish=splash')
assert not m, '"splish=splash" was found before i put it there!!! edit it to remove "splish=splash" please.'
waagent.AppendToLinuxKernelCmdline('splish=splash')
m=waagent.FindInLinuxKernelCmdline('splish=splash')
assert m, 'AppendToLinuxKernelCmdline failed, "splish=splash" still not found.'
waagent.RemoveFromLinuxKernelCmdline('splish=splash')
m=waagent.FindInLinuxKernelCmdline('splish=splash')
assert not m, 'RemoveFromLinuxKernelCmdline failed, "splish=splash" still found.'
###############Generic waagent tests##############
def testLogFile(self):
"""
Write a random number with waagent.Log() and read it back.
"""
rnds=str(random.random())
waagent.Log('testLogFile: '+rnds)
found = rnds in (open('/var/log/waagent.log','rb').read())
assert found,'Unable to find '+rnds+' in /var/log/waagent.log'
def testFindReplaceStringInFile(self):
"""
Test file/string operations using
string literals and regular expressions.
Tests:
FindStringInFile
ReplaceStringInFile
"""
fn='/tmp/junk'
if os.path.exists(fn):
os.remove(fn)
sp='splish splash'
yb='yabba dabba do'
open(fn,'wb').write(sp+' I was taking a bath.')
m=waagent.FindStringInFile(fn,sp)
assert m is not None,'waagent.FindStringInFile() Failed: '+sp+' not found in ' + fn + '.'
src=r'^(.*)('+sp+')(.*)$'
rpl=r'\1 '+sp+'\2 '+yb+' \3'
waagent.ReplaceStringInFile(fn,src,rpl)
m=waagent.FindStringInFile(fn,yb)
assert m is not None,'waagent.ReplaceStringInFile() Failed: '+yb+' not found in ' + fn + '.'
def testGetFirstActiveNetworkInterfaceNonLoopback(self):
"""
Test GetFirstActiveNetworkInterfaceNonLoopback.
Fail if iface is 'lo'
"""
addr='null'
iface=waagent.GetFirstActiveNetworkInterfaceNonLoopback()[0]
addr=waagent.GetFirstActiveNetworkInterfaceNonLoopback()[1]
assert len(iface)>1,'Interface name too short'
assert iface is not 'lo','Loopback Interface was returned'
print 'iface=' + iface + ' addr=' + addr
def testTwoLogWritingThreads(self):
"""
Test that two threads writing to the same log
function do not block or scramble messages.
TODO - there is no check for success !!!
"""
for j in range(5):
t1=threading.Thread(target=self.writetothelog,args=(4,))
t2=threading.Thread(target=self.writetothelog,args=(2,))
t1.start()
t2.start()
t1.join()
t2.join()
def testCertificatesParse(self):
"""
TODO - need cert xml from test...
"""
pass
def testSharedConfigParse(self):
"""
Test SharedConfig().Parse returns without error.
"""
assert waagent.SharedConfig().Parse(SHAREDCONFIG), 'Error parsing SharedConfig.xml'
def testOvfEnvParse(self):
"""
Test OvfEnv().Parse returns without error.
"""
assert waagent.OvfEnv().Parse(OVFXML) is not None , 'Failed to Parse ovfxml'
def testOvfEnvProcess(self):
"""
We expect the /var/lib/waagent/Certificates.p7m file exists.
Test ovfenv.Process() return other than None.
"""
assert os.path.exists('/var/lib/waagent/Certificates.p7m') , 'We expect the /var/lib/waagent/Certificates.p7m file exists.'
waagent.WaAgent = waagent.Agent()
ovfenv=waagent.OvfEnv().Parse(OVFXML)
waagent.WaAgent.EnvMonitor = waagent.EnvMonitor()
assert ovfenv.Process() is None , 'Failed to Process ovfxml'
waagent.Run("userdel -f -r myUserName")
def testAgentProvision(self):
"""
TODO - Test Provision in non-fabric environment
"""
waagent.verbose = True
waagent.WaAgent = waagent.Agent()
waagent.WaAgent.EnvMonitor = waagent.EnvMonitor()
waagent.Config = waagent.ConfigurationProvider()
# we cant report our role unless we have one.
waagent.WaAgent.ReportRoleProperties=self.noop
err=waagent.WaAgent.Provision()
assert err == None, 'Provision Failed error ' + str(err)
########################################
OVFXML="""
1.0LinuxProvisioningConfigurationegub13-vmmyUserNamemypasswordfalse2D97B25D49B98ECC90BF1600D66D68799CFB361E/home/myUserName/.ssh/authorized_keys
1.0kms.core.windows.net
"""
SHAREDCONFIG="""
"""
if __name__ == '__main__':
s=unittest.TestLoader().loadTestsFromTestCase(WaagentTestCases)
unittest.TextTestRunner(verbosity=2).run(s)
# import cProfile
# cProfile.run('unittest.TextTestRunner(verbosity=2).run(s)','profile.out')