diff options
-rwxr-xr-x | cloud-init-run-module.py | 20 | ||||
-rwxr-xr-x | ec2-init.py | 12 | ||||
-rw-r--r-- | ec2init/DataSource.py | 18 | ||||
-rw-r--r-- | ec2init/DataSourceEc2.py | 7 | ||||
-rw-r--r-- | ec2init/UserDataHandler.py | 62 | ||||
-rw-r--r-- | ec2init/__init__.py | 209 | ||||
-rw-r--r-- | ec2init/execute.py | 1 | ||||
-rw-r--r-- | upstart/cat-cloud-config.conf | 2 | ||||
-rw-r--r-- | upstart/cloud-run-user-script.conf | 7 |
9 files changed, 230 insertions, 108 deletions
diff --git a/cloud-init-run-module.py b/cloud-init-run-module.py index 0e32c63b..d2d4feee 100755 --- a/cloud-init-run-module.py +++ b/cloud-init-run-module.py @@ -16,7 +16,15 @@ def main(): (freq,semname,modname)=sys.argv[1:4] run_args=sys.argv[4:] - if ec2init.sem_has_run(semname,freq): + cloud = ec2init.EC2Init() + try: + cloud.get_data_source() + except Exception as e: + print e + sys.stderr.write("Failed to get instance data") + sys.exit(1) + + if cloud.sem_has_run(semname,freq): sys.stderr.write("%s already ran %s\n" % (semname,freq)) sys.exit(0) @@ -34,15 +42,7 @@ def main(): if os.environ.has_key(cfg_env_name): cfg_path = os.environ[cfg_env_name] - try: - if not ec2init.sem_acquire(semname,freq): - sys.stderr.write("Failed to acquire lock on %s\n" % semname) - sys.exit(1) - - inst.run(run_args,cfg_path) - except: - ec2init.sem_clear(semname,freq) - raise + cloud.sem_and_run(semname, freq, inst.run, [run_args,cfg_path], False) sys.exit(0) diff --git a/ec2-init.py b/ec2-init.py index 90da7dc6..464bf568 100755 --- a/ec2-init.py +++ b/ec2-init.py @@ -6,6 +6,9 @@ import sys import ec2init +def warn(str): + sys.stderr.write(str) + def main(): cloud = ec2init.EC2Init() @@ -17,13 +20,18 @@ def main(): sys.stderr.write("Failed to get instance data") sys.exit(1) - print "user data is:" + cloud.get_user_data() + #print "user data is:" + cloud.get_user_data() # store the metadata cloud.update_cache() # parse the user data (ec2-run-userdata.py) - # TODO: cloud.consume_user_data() + try: + cloud.sem_and_run("consume_userdata", "once-per-instance", + cloud.consume_userdata,[],False) + except: + warn("consuming user data failed!") + raise # set the defaults (like what ec2-set-defaults.py did) # TODO: cloud.set_defaults() diff --git a/ec2init/DataSource.py b/ec2init/DataSource.py index 23e585e2..af5e9208 100644 --- a/ec2init/DataSource.py +++ b/ec2init/DataSource.py @@ -1,5 +1,6 @@ import ec2init +import UserDataHandler as ud class DataSource: userdata = None @@ -9,21 +10,10 @@ class DataSource: def __init__(self): pass - def store_user_data_raw(self): - fp=fopen(user_data_raw,"wb") - fp.write(self.userdata_raw) - fp.close() - - def store_user_data(self): - fp=fopen(user_data,"wb") - fp.write(self.userdata) - fp.close() - - def get_user_data(self): + def get_userdata(self): if self.userdata == None: - self.userdata = ec2init.preprocess_user_data(self.userdata_raw) - + self.userdata = ud.preprocess_userdata(self.userdata_raw) return self.userdata - def get_user_data_raw(self): + def get_userdata_raw(self): return(self.userdata_raw) diff --git a/ec2init/DataSourceEc2.py b/ec2init/DataSourceEc2.py index 8ee92d29..cc12c97c 100644 --- a/ec2init/DataSourceEc2.py +++ b/ec2init/DataSourceEc2.py @@ -24,7 +24,7 @@ class DataSourceEc2(DataSource.DataSource): def __init__(self): self.meta_data_base_url = 'http://169.254.169.254/%s/meta-data' % self.api_ver - self.user_data_base_url = 'http://169.254.169.254/%s/user-data' % self.api_ver + self.userdata_base_url = 'http://169.254.169.254/%s/user-data' % self.api_ver def get_data(self): try: @@ -49,6 +49,9 @@ class DataSourceEc2(DataSource.DataSource): print e return False + def get_instance_id(self): + return(self.metadata['instance-id']) + def wait_or_bail(self): if self.wait_for_metadata_service(): return True @@ -67,7 +70,7 @@ class DataSourceEc2(DataSource.DataSource): keyids = [line.split('=')[0] for line in data.split('\n')] return [urllib2.urlopen('%s/public-keys/%d/openssh-key' % (self.meta_data_base_url, int(keyid))).read().rstrip() for keyid in keyids] -# def get_user_data(self): +# def get_userdata(self): # return boto.utils.get_instance_userdata() # # def get_instance_metadata(self): diff --git a/ec2init/UserDataHandler.py b/ec2init/UserDataHandler.py index ec265af5..f7c56c69 100644 --- a/ec2init/UserDataHandler.py +++ b/ec2init/UserDataHandler.py @@ -3,13 +3,30 @@ import email from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText + +starts_with_mappings={ + '#include' : 'text/x-include-url', + '#!' : 'text/x-shellscript', + '#cloud-config' : 'text/cloud-config' +} + +# if 'str' is compressed return decompressed otherwise return it +def decomp_str(str): + import StringIO + import gzip + try: + uncomp = gzip.GzipFile(None,"rb",1,StringIO.StringIO(str)).read() + return(uncomp) + except: + return(str) + def do_include(str,parts): import urllib # is just a list of urls, one per line for line in str.splitlines(): if line == "#include": continue content = urllib.urlopen(line).read() - process_includes(email.message_from_string(content),parts) + process_includes(email.message_from_string(decomp_str(content)),parts) def process_includes(msg,parts): # parts is a dictionary of arrays @@ -24,20 +41,24 @@ def process_includes(msg,parts): if part.get_content_maintype() == 'multipart': continue ctype = part.get_content_type() + + payload = part.get_payload() + if ctype is None: - # No guess could be made, or the file is encoded (compressed), so - # use a generic bag-of-bits type. ctype = 'application/octet-stream' + for str, gtype in starts_with_mappings.items(): + if payload.startswith(str): + ctype = gtype - if ctype == 'text/x-include-url' or \ - part.get_payload().startswith("#include"): - do_include(part.get_payload(),parts) + if ctype == 'text/x-include-url': + do_include(payload,parts) continue + filename = part.get_filename() if not filename: filename = 'part-%03d' % len(parts['content']) - parts['content'].append(part.get_payload()) + parts['content'].append(payload) parts['types'].append(ctype) parts['names'].append(filename) @@ -68,11 +89,34 @@ def parts2mime(parts): return(outer.as_string()) # this is heavily wasteful, reads through userdata string input -def preprocess_userdata(str): +def preprocess_userdata(data): parts = { } - process_includes(email.message_from_string(data),parts) + process_includes(email.message_from_string(decomp_str(data)),parts) return(parts2mime(parts)) +# callbacks is a dictionary with: +# { 'content-type': handler(data,content_type,filename,payload) } +def walk_userdata(str, callbacks, data = None): + partnum = 0 + for part in email.message_from_string(str).walk(): + # multipart/* are just containers + if part.get_content_maintype() == 'multipart': + continue + + ctype = part.get_content_type() + if ctype is None: + ctype = 'application/octet-stream' + + filename = part.get_filename() + if not filename: + filename = 'part-%03d' % partnum + + print ":::::::: %s,%s :::::::" % (ctype,filename) + if callbacks.has_key(ctype): + callbacks[ctype](data,ctype,filename,part.get_payload()) + + partnum = partnum+1 + if __name__ == "__main__": import sys data = file(sys.argv[1]).read() diff --git a/ec2init/__init__.py b/ec2init/__init__.py index fe627ecd..3d0ddbaa 100644 --- a/ec2init/__init__.py +++ b/ec2init/__init__.py @@ -20,7 +20,6 @@ import os from configobj import ConfigObj -import boto.utils import cPickle import sys import os.path @@ -29,16 +28,28 @@ import errno datadir = '/var/lib/cloud/data' semdir = '/var/lib/cloud/sem' cachedir = datadir + '/cache' -user_data = datadir + '/user-data.txt' -user_data_raw = datadir + '/user-data.raw' -user_config = datadir + '/user-config.txt' +userdata_raw = datadir + '/user-data.txt' +userdata = datadir + '/user-data.txt.i' +user_scripts_dir = datadir + "/scripts" +cloud_config = datadir + '/cloud-config.txt' data_source_cache = cachedir + '/obj.pkl' cfg_env_name = "CLOUD_CFG" import DataSourceEc2 +import UserDataHandler class EC2Init: datasource_list = [ DataSourceEc2.DataSourceEc2 ] + part_handlers = { } + + def __init__(self): + self.part_handlers = { + 'text/x-shellscript' : self.handle_user_script, + 'text/cloud-config' : self.handle_cloud_config, + 'text/upstart-job' : self.handle_upstart_job, + 'text/part-handler' : self.handle_handler + } + def restore_from_cache(self): try: @@ -72,20 +83,20 @@ class EC2Init: pass raise Exception("Could not find data source") - def get_user_data(self): - return(self.datasource.get_user_data()) + def get_userdata(self): + return(self.datasource.get_userdata()) def update_cache(self): self.write_to_cache() - self.store_user_data() + self.store_userdata() - def store_user_data(self): - f = open(user_data_raw,"wb") - f.write(self.datasource.get_user_data_raw()) + def store_userdata(self): + f = open(userdata_raw,"wb") + f.write(self.datasource.get_userdata_raw()) f.close() - f = open(user_data,"wb") - f.write(self.get_user_data()) + f = open(userdata,"wb") + f.write(self.get_userdata()) f.close() def get_cfg_option_bool(self, key, default=None): @@ -97,63 +108,127 @@ class EC2Init: def initctl_emit(self): import subprocess subprocess.Popen(['initctl', 'emit', 'cloud-config', - '%s=%s' % (cfg_env_name,user_config)]).communicate() - - -# if 'str' is compressed return decompressed otherwise return it -def decomp_str(str): - import StringIO - import gzip - try: - uncomp = gzip.GzipFile(None,"rb",1,StringIO.StringIO(str)).read() - return(uncomp) - except: - return(str) - - -# preprocess the user data (include / uncompress) -def preprocess_user_data(ud): - return(decomp_str(ud)) - -def sem_getpath(name,freq): - # TODO: freqtok must represent "once-per-instance" somehow - freqtok = freq - return("%s/%s.%s" % (semdir,name,freqtok)) - -def sem_has_run(name,freq): - semfile = sem_getpath(name,freq) - if os.path.exists(semfile): + '%s=%s' % (cfg_env_name,cloud_config)]).communicate() + + def sem_getpath(self,name,freq): + freqtok = freq + if freq == 'once-per-instance': + freqtok = self.datasource.get_instance_id() + + return("%s/%s.%s" % (semdir,name,freqtok)) + + def sem_has_run(self,name,freq): + semfile = self.sem_getpath(name,freq) + if os.path.exists(semfile): + return True + return False + + def sem_acquire(self,name,freq): + from time import time + semfile = self.sem_getpath(name,freq) + + try: + os.makedirs(os.path.dirname(semfile)) + except OSError as e: + if e.errno != errno.EEXIST: + raise e + + if os.path.exists(semfile): + return False + + # race condition + try: + f = open(semfile,"w") + f.write("%s\n" % str(time())) + f.close() + except: + return(False) + return(True) + + def sem_clear(self,name,freq): + semfile = self.sem_getpath(name,freq) + try: + os.unlink(semfile) + except OSError as e: + if e.errno != errno.ENOENT: + return False + return True - return False - -def sem_acquire(name,freq): - from time import time - semfile = sem_getpath(name,freq) - try: - os.makedirs(os.path.dirname(semfile)) - except OSError as e: - if e.errno != errno.EEXIST: - raise e + # acquire lock on 'name' for given 'freq' + # if that does not exist, then call 'func' with given 'args' + # if 'clear_on_fail' is True and func throws an exception + # then remove the lock (so it would run again) + def sem_and_run(self,semname,freq,func,args=[],clear_on_fail=False): + if self.sem_has_run(semname,freq): return + try: + if not self.sem_acquire(semname,freq): + raise Exception("Failed to acquire lock on %s\n" % semname) - if os.path.exists(semfile): - return False + func(*args) + except: + if clear_on_fail: + self.sem_clear(semname,freq) + raise + + def consume_userdata(self): + self.get_userdata() + data = self + # give callbacks opportunity to initialize + for ctype, func in self.part_handlers.items(): + func(data, "__begin__",None,None) + UserDataHandler.walk_userdata(self.get_userdata(), + self.part_handlers, data) + + # give callbacks opportunity to finalize + for ctype, func in self.part_handlers.items(): + func(data,"__end__",None,None) + + def handle_handler(self,data,ctype,filename,payload): + if ctype == "__begin__" or ctype == "__end__": return + + # - do something to include the handler, ie, eval it or something + # - call it with '__begin__' + # - add it self.part_handlers + # self.part_handlers['new_type']=handler + print "Do not know what to do with a handler yet, sorry" + + def handle_user_script(self,data,ctype,filename,payload): + if ctype == "__end__": return + if ctype == "__begin__": + # maybe delete existing things here + return + + filename=filename.replace(os.sep,'_') + write_file("%s/%s" % (user_scripts_dir,filename), payload, 0700) + + def handle_upstart_job(self,data,ctype,filename,payload): + if ctype == "__end__" or ctype == "__begin__": return + if not filename.endswith(".conf"): + filename=filename+".conf" + + write_file("%s/%s" % ("/etc/init",filename), payload, 0644) + + def handle_cloud_config(self,data,ctype,filename,payload): + if ctype == "__begin__": + self.cloud_config_str="" + return + if ctype == "__end__": + f=open(cloud_config, "wb") + f.write(self.cloud_config_str) + f.close() + return + + self.cloud_config_str+="\n#%s\n%s" % (filename,payload) + +def write_file(file,content,mode=0644): + try: + os.makedirs(os.path.dirname(file)) + except OSError as e: + if e.errno != errno.EEXIST: + raise e - # race condition - try: - f = open(semfile,"w") - f.write(str(time())) + f=open(file,"wb") + f.write(content) f.close() - except: - return(False) - return(True) - -def sem_clear(name,freq): - semfile = sem_getpath(name,freq) - try: - os.unlink(semfile) - except OSError as e: - if e.errno != errno.ENOENT: - return False - - return True + os.chmod(file,mode) diff --git a/ec2init/execute.py b/ec2init/execute.py index d7386663..033da8f7 100644 --- a/ec2init/execute.py +++ b/ec2init/execute.py @@ -1,6 +1,5 @@ def run(list,cfg): import subprocess - subprocess.Popen(list).communicate() retcode = subprocess.call(list) if retcode == 0: diff --git a/upstart/cat-cloud-config.conf b/upstart/cat-cloud-config.conf index 3f053896..9cc3b2ce 100644 --- a/upstart/cat-cloud-config.conf +++ b/upstart/cat-cloud-config.conf @@ -4,4 +4,4 @@ start on cloud-config console output task -exec cloud-init-run-module once_per_ami catconfig execute cat $CLOUD_CFG +exec cloud-init-run-module once-per-instance catconfig execute cat $CLOUD_CFG diff --git a/upstart/cloud-run-user-script.conf b/upstart/cloud-run-user-script.conf index 49edbea4..31484d01 100644 --- a/upstart/cloud-run-user-script.conf +++ b/upstart/cloud-run-user-script.conf @@ -7,5 +7,8 @@ start on stopped rc RUNLEVEL=[2345] console output task -exec cloud-init-run-module once_per_ami user-scripts execute \ - run-parts "/var/lib/cloud/scripts" +script +sdir=/var/lib/cloud/data/scripts +[ -d "$sdir" ] || exit 0 +exec cloud-init-run-module once_per_ami user-scripts execute run-parts "$sdir" +end script |