summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xcloud-init-run-module.py20
-rwxr-xr-xec2-init.py12
-rw-r--r--ec2init/DataSource.py18
-rw-r--r--ec2init/DataSourceEc2.py7
-rw-r--r--ec2init/UserDataHandler.py62
-rw-r--r--ec2init/__init__.py209
-rw-r--r--ec2init/execute.py1
-rw-r--r--upstart/cat-cloud-config.conf2
-rw-r--r--upstart/cloud-run-user-script.conf7
9 files changed, 230 insertions, 108 deletions
diff --git a/cloud-init-run-module.py b/cloud-init-run-module.py
index 0e32c63b..d2d4feee 100755
--- a/cloud-init-run-module.py
+++ b/cloud-init-run-module.py
@@ -16,7 +16,15 @@ def main():
(freq,semname,modname)=sys.argv[1:4]
run_args=sys.argv[4:]
- if ec2init.sem_has_run(semname,freq):
+ cloud = ec2init.EC2Init()
+ try:
+ cloud.get_data_source()
+ except Exception as e:
+ print e
+ sys.stderr.write("Failed to get instance data")
+ sys.exit(1)
+
+ if cloud.sem_has_run(semname,freq):
sys.stderr.write("%s already ran %s\n" % (semname,freq))
sys.exit(0)
@@ -34,15 +42,7 @@ def main():
if os.environ.has_key(cfg_env_name):
cfg_path = os.environ[cfg_env_name]
- try:
- if not ec2init.sem_acquire(semname,freq):
- sys.stderr.write("Failed to acquire lock on %s\n" % semname)
- sys.exit(1)
-
- inst.run(run_args,cfg_path)
- except:
- ec2init.sem_clear(semname,freq)
- raise
+ cloud.sem_and_run(semname, freq, inst.run, [run_args,cfg_path], False)
sys.exit(0)
diff --git a/ec2-init.py b/ec2-init.py
index 90da7dc6..464bf568 100755
--- a/ec2-init.py
+++ b/ec2-init.py
@@ -6,6 +6,9 @@ import sys
import ec2init
+def warn(str):
+ sys.stderr.write(str)
+
def main():
cloud = ec2init.EC2Init()
@@ -17,13 +20,18 @@ def main():
sys.stderr.write("Failed to get instance data")
sys.exit(1)
- print "user data is:" + cloud.get_user_data()
+ #print "user data is:" + cloud.get_user_data()
# store the metadata
cloud.update_cache()
# parse the user data (ec2-run-userdata.py)
- # TODO: cloud.consume_user_data()
+ try:
+ cloud.sem_and_run("consume_userdata", "once-per-instance",
+ cloud.consume_userdata,[],False)
+ except:
+ warn("consuming user data failed!")
+ raise
# set the defaults (like what ec2-set-defaults.py did)
# TODO: cloud.set_defaults()
diff --git a/ec2init/DataSource.py b/ec2init/DataSource.py
index 23e585e2..af5e9208 100644
--- a/ec2init/DataSource.py
+++ b/ec2init/DataSource.py
@@ -1,5 +1,6 @@
import ec2init
+import UserDataHandler as ud
class DataSource:
userdata = None
@@ -9,21 +10,10 @@ class DataSource:
def __init__(self):
pass
- def store_user_data_raw(self):
- fp=fopen(user_data_raw,"wb")
- fp.write(self.userdata_raw)
- fp.close()
-
- def store_user_data(self):
- fp=fopen(user_data,"wb")
- fp.write(self.userdata)
- fp.close()
-
- def get_user_data(self):
+ def get_userdata(self):
if self.userdata == None:
- self.userdata = ec2init.preprocess_user_data(self.userdata_raw)
-
+ self.userdata = ud.preprocess_userdata(self.userdata_raw)
return self.userdata
- def get_user_data_raw(self):
+ def get_userdata_raw(self):
return(self.userdata_raw)
diff --git a/ec2init/DataSourceEc2.py b/ec2init/DataSourceEc2.py
index 8ee92d29..cc12c97c 100644
--- a/ec2init/DataSourceEc2.py
+++ b/ec2init/DataSourceEc2.py
@@ -24,7 +24,7 @@ class DataSourceEc2(DataSource.DataSource):
def __init__(self):
self.meta_data_base_url = 'http://169.254.169.254/%s/meta-data' % self.api_ver
- self.user_data_base_url = 'http://169.254.169.254/%s/user-data' % self.api_ver
+ self.userdata_base_url = 'http://169.254.169.254/%s/user-data' % self.api_ver
def get_data(self):
try:
@@ -49,6 +49,9 @@ class DataSourceEc2(DataSource.DataSource):
print e
return False
+ def get_instance_id(self):
+ return(self.metadata['instance-id'])
+
def wait_or_bail(self):
if self.wait_for_metadata_service():
return True
@@ -67,7 +70,7 @@ class DataSourceEc2(DataSource.DataSource):
keyids = [line.split('=')[0] for line in data.split('\n')]
return [urllib2.urlopen('%s/public-keys/%d/openssh-key' % (self.meta_data_base_url, int(keyid))).read().rstrip() for keyid in keyids]
-# def get_user_data(self):
+# def get_userdata(self):
# return boto.utils.get_instance_userdata()
#
# def get_instance_metadata(self):
diff --git a/ec2init/UserDataHandler.py b/ec2init/UserDataHandler.py
index ec265af5..f7c56c69 100644
--- a/ec2init/UserDataHandler.py
+++ b/ec2init/UserDataHandler.py
@@ -3,13 +3,30 @@ import email
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
+
+starts_with_mappings={
+ '#include' : 'text/x-include-url',
+ '#!' : 'text/x-shellscript',
+ '#cloud-config' : 'text/cloud-config'
+}
+
+# if 'str' is compressed return decompressed otherwise return it
+def decomp_str(str):
+ import StringIO
+ import gzip
+ try:
+ uncomp = gzip.GzipFile(None,"rb",1,StringIO.StringIO(str)).read()
+ return(uncomp)
+ except:
+ return(str)
+
def do_include(str,parts):
import urllib
# is just a list of urls, one per line
for line in str.splitlines():
if line == "#include": continue
content = urllib.urlopen(line).read()
- process_includes(email.message_from_string(content),parts)
+ process_includes(email.message_from_string(decomp_str(content)),parts)
def process_includes(msg,parts):
# parts is a dictionary of arrays
@@ -24,20 +41,24 @@ def process_includes(msg,parts):
if part.get_content_maintype() == 'multipart':
continue
ctype = part.get_content_type()
+
+ payload = part.get_payload()
+
if ctype is None:
- # No guess could be made, or the file is encoded (compressed), so
- # use a generic bag-of-bits type.
ctype = 'application/octet-stream'
+ for str, gtype in starts_with_mappings.items():
+ if payload.startswith(str):
+ ctype = gtype
- if ctype == 'text/x-include-url' or \
- part.get_payload().startswith("#include"):
- do_include(part.get_payload(),parts)
+ if ctype == 'text/x-include-url':
+ do_include(payload,parts)
continue
+
filename = part.get_filename()
if not filename:
filename = 'part-%03d' % len(parts['content'])
- parts['content'].append(part.get_payload())
+ parts['content'].append(payload)
parts['types'].append(ctype)
parts['names'].append(filename)
@@ -68,11 +89,34 @@ def parts2mime(parts):
return(outer.as_string())
# this is heavily wasteful, reads through userdata string input
-def preprocess_userdata(str):
+def preprocess_userdata(data):
parts = { }
- process_includes(email.message_from_string(data),parts)
+ process_includes(email.message_from_string(decomp_str(data)),parts)
return(parts2mime(parts))
+# callbacks is a dictionary with:
+# { 'content-type': handler(data,content_type,filename,payload) }
+def walk_userdata(str, callbacks, data = None):
+ partnum = 0
+ for part in email.message_from_string(str).walk():
+ # multipart/* are just containers
+ if part.get_content_maintype() == 'multipart':
+ continue
+
+ ctype = part.get_content_type()
+ if ctype is None:
+ ctype = 'application/octet-stream'
+
+ filename = part.get_filename()
+ if not filename:
+ filename = 'part-%03d' % partnum
+
+ print ":::::::: %s,%s :::::::" % (ctype,filename)
+ if callbacks.has_key(ctype):
+ callbacks[ctype](data,ctype,filename,part.get_payload())
+
+ partnum = partnum+1
+
if __name__ == "__main__":
import sys
data = file(sys.argv[1]).read()
diff --git a/ec2init/__init__.py b/ec2init/__init__.py
index fe627ecd..3d0ddbaa 100644
--- a/ec2init/__init__.py
+++ b/ec2init/__init__.py
@@ -20,7 +20,6 @@
import os
from configobj import ConfigObj
-import boto.utils
import cPickle
import sys
import os.path
@@ -29,16 +28,28 @@ import errno
datadir = '/var/lib/cloud/data'
semdir = '/var/lib/cloud/sem'
cachedir = datadir + '/cache'
-user_data = datadir + '/user-data.txt'
-user_data_raw = datadir + '/user-data.raw'
-user_config = datadir + '/user-config.txt'
+userdata_raw = datadir + '/user-data.txt'
+userdata = datadir + '/user-data.txt.i'
+user_scripts_dir = datadir + "/scripts"
+cloud_config = datadir + '/cloud-config.txt'
data_source_cache = cachedir + '/obj.pkl'
cfg_env_name = "CLOUD_CFG"
import DataSourceEc2
+import UserDataHandler
class EC2Init:
datasource_list = [ DataSourceEc2.DataSourceEc2 ]
+ part_handlers = { }
+
+ def __init__(self):
+ self.part_handlers = {
+ 'text/x-shellscript' : self.handle_user_script,
+ 'text/cloud-config' : self.handle_cloud_config,
+ 'text/upstart-job' : self.handle_upstart_job,
+ 'text/part-handler' : self.handle_handler
+ }
+
def restore_from_cache(self):
try:
@@ -72,20 +83,20 @@ class EC2Init:
pass
raise Exception("Could not find data source")
- def get_user_data(self):
- return(self.datasource.get_user_data())
+ def get_userdata(self):
+ return(self.datasource.get_userdata())
def update_cache(self):
self.write_to_cache()
- self.store_user_data()
+ self.store_userdata()
- def store_user_data(self):
- f = open(user_data_raw,"wb")
- f.write(self.datasource.get_user_data_raw())
+ def store_userdata(self):
+ f = open(userdata_raw,"wb")
+ f.write(self.datasource.get_userdata_raw())
f.close()
- f = open(user_data,"wb")
- f.write(self.get_user_data())
+ f = open(userdata,"wb")
+ f.write(self.get_userdata())
f.close()
def get_cfg_option_bool(self, key, default=None):
@@ -97,63 +108,127 @@ class EC2Init:
def initctl_emit(self):
import subprocess
subprocess.Popen(['initctl', 'emit', 'cloud-config',
- '%s=%s' % (cfg_env_name,user_config)]).communicate()
-
-
-# if 'str' is compressed return decompressed otherwise return it
-def decomp_str(str):
- import StringIO
- import gzip
- try:
- uncomp = gzip.GzipFile(None,"rb",1,StringIO.StringIO(str)).read()
- return(uncomp)
- except:
- return(str)
-
-
-# preprocess the user data (include / uncompress)
-def preprocess_user_data(ud):
- return(decomp_str(ud))
-
-def sem_getpath(name,freq):
- # TODO: freqtok must represent "once-per-instance" somehow
- freqtok = freq
- return("%s/%s.%s" % (semdir,name,freqtok))
-
-def sem_has_run(name,freq):
- semfile = sem_getpath(name,freq)
- if os.path.exists(semfile):
+ '%s=%s' % (cfg_env_name,cloud_config)]).communicate()
+
+ def sem_getpath(self,name,freq):
+ freqtok = freq
+ if freq == 'once-per-instance':
+ freqtok = self.datasource.get_instance_id()
+
+ return("%s/%s.%s" % (semdir,name,freqtok))
+
+ def sem_has_run(self,name,freq):
+ semfile = self.sem_getpath(name,freq)
+ if os.path.exists(semfile):
+ return True
+ return False
+
+ def sem_acquire(self,name,freq):
+ from time import time
+ semfile = self.sem_getpath(name,freq)
+
+ try:
+ os.makedirs(os.path.dirname(semfile))
+ except OSError as e:
+ if e.errno != errno.EEXIST:
+ raise e
+
+ if os.path.exists(semfile):
+ return False
+
+ # race condition
+ try:
+ f = open(semfile,"w")
+ f.write("%s\n" % str(time()))
+ f.close()
+ except:
+ return(False)
+ return(True)
+
+ def sem_clear(self,name,freq):
+ semfile = self.sem_getpath(name,freq)
+ try:
+ os.unlink(semfile)
+ except OSError as e:
+ if e.errno != errno.ENOENT:
+ return False
+
return True
- return False
-
-def sem_acquire(name,freq):
- from time import time
- semfile = sem_getpath(name,freq)
- try:
- os.makedirs(os.path.dirname(semfile))
- except OSError as e:
- if e.errno != errno.EEXIST:
- raise e
+ # acquire lock on 'name' for given 'freq'
+ # if that does not exist, then call 'func' with given 'args'
+ # if 'clear_on_fail' is True and func throws an exception
+ # then remove the lock (so it would run again)
+ def sem_and_run(self,semname,freq,func,args=[],clear_on_fail=False):
+ if self.sem_has_run(semname,freq): return
+ try:
+ if not self.sem_acquire(semname,freq):
+ raise Exception("Failed to acquire lock on %s\n" % semname)
- if os.path.exists(semfile):
- return False
+ func(*args)
+ except:
+ if clear_on_fail:
+ self.sem_clear(semname,freq)
+ raise
+
+ def consume_userdata(self):
+ self.get_userdata()
+ data = self
+ # give callbacks opportunity to initialize
+ for ctype, func in self.part_handlers.items():
+ func(data, "__begin__",None,None)
+ UserDataHandler.walk_userdata(self.get_userdata(),
+ self.part_handlers, data)
+
+ # give callbacks opportunity to finalize
+ for ctype, func in self.part_handlers.items():
+ func(data,"__end__",None,None)
+
+ def handle_handler(self,data,ctype,filename,payload):
+ if ctype == "__begin__" or ctype == "__end__": return
+
+ # - do something to include the handler, ie, eval it or something
+ # - call it with '__begin__'
+ # - add it self.part_handlers
+ # self.part_handlers['new_type']=handler
+ print "Do not know what to do with a handler yet, sorry"
+
+ def handle_user_script(self,data,ctype,filename,payload):
+ if ctype == "__end__": return
+ if ctype == "__begin__":
+ # maybe delete existing things here
+ return
+
+ filename=filename.replace(os.sep,'_')
+ write_file("%s/%s" % (user_scripts_dir,filename), payload, 0700)
+
+ def handle_upstart_job(self,data,ctype,filename,payload):
+ if ctype == "__end__" or ctype == "__begin__": return
+ if not filename.endswith(".conf"):
+ filename=filename+".conf"
+
+ write_file("%s/%s" % ("/etc/init",filename), payload, 0644)
+
+ def handle_cloud_config(self,data,ctype,filename,payload):
+ if ctype == "__begin__":
+ self.cloud_config_str=""
+ return
+ if ctype == "__end__":
+ f=open(cloud_config, "wb")
+ f.write(self.cloud_config_str)
+ f.close()
+ return
+
+ self.cloud_config_str+="\n#%s\n%s" % (filename,payload)
+
+def write_file(file,content,mode=0644):
+ try:
+ os.makedirs(os.path.dirname(file))
+ except OSError as e:
+ if e.errno != errno.EEXIST:
+ raise e
- # race condition
- try:
- f = open(semfile,"w")
- f.write(str(time()))
+ f=open(file,"wb")
+ f.write(content)
f.close()
- except:
- return(False)
- return(True)
-
-def sem_clear(name,freq):
- semfile = sem_getpath(name,freq)
- try:
- os.unlink(semfile)
- except OSError as e:
- if e.errno != errno.ENOENT:
- return False
-
- return True
+ os.chmod(file,mode)
diff --git a/ec2init/execute.py b/ec2init/execute.py
index d7386663..033da8f7 100644
--- a/ec2init/execute.py
+++ b/ec2init/execute.py
@@ -1,6 +1,5 @@
def run(list,cfg):
import subprocess
- subprocess.Popen(list).communicate()
retcode = subprocess.call(list)
if retcode == 0:
diff --git a/upstart/cat-cloud-config.conf b/upstart/cat-cloud-config.conf
index 3f053896..9cc3b2ce 100644
--- a/upstart/cat-cloud-config.conf
+++ b/upstart/cat-cloud-config.conf
@@ -4,4 +4,4 @@ start on cloud-config
console output
task
-exec cloud-init-run-module once_per_ami catconfig execute cat $CLOUD_CFG
+exec cloud-init-run-module once-per-instance catconfig execute cat $CLOUD_CFG
diff --git a/upstart/cloud-run-user-script.conf b/upstart/cloud-run-user-script.conf
index 49edbea4..31484d01 100644
--- a/upstart/cloud-run-user-script.conf
+++ b/upstart/cloud-run-user-script.conf
@@ -7,5 +7,8 @@ start on stopped rc RUNLEVEL=[2345]
console output
task
-exec cloud-init-run-module once_per_ami user-scripts execute \
- run-parts "/var/lib/cloud/scripts"
+script
+sdir=/var/lib/cloud/data/scripts
+[ -d "$sdir" ] || exit 0
+exec cloud-init-run-module once_per_ami user-scripts execute run-parts "$sdir"
+end script