diff options
author | Joshua Harlow <harlowja@yahoo-inc.com> | 2012-06-08 17:56:15 -0700 |
---|---|---|
committer | Joshua Harlow <harlowja@yahoo-inc.com> | 2012-06-08 17:56:15 -0700 |
commit | e389ecb7af6387de477b1a50e48044b51e65a98c (patch) | |
tree | 9f6177465e7d0054b1dc1a1af72a00ea660d71da | |
parent | 3e6217b64ba6436af991e52163fea850f21bb770 (diff) | |
download | vyos-cloud-init-e389ecb7af6387de477b1a50e48044b51e65a98c.tar.gz vyos-cloud-init-e389ecb7af6387de477b1a50e48044b51e65a98c.zip |
This now holds the following classes:
CloudInit - cut up to only provide some basic init processes
CloudPartData - provided to handlers so that they can fetch needed data without providing the whole enchilda of cloud init.
CloudPaths - holds the paths that should be used, for instances, for non-instances and such.
CloudSemaphores - holds the concept of cloud inits sempaphores, but cleaned up, using context manager to help here.
CloudHandlers - holds the user data handlers to be activated
CloudConfig - the cloud config object (to be cleaned up)
-rw-r--r-- | cloudinit/cloud.py | 675 |
1 files changed, 372 insertions, 303 deletions
diff --git a/cloudinit/cloud.py b/cloudinit/cloud.py index eb71439b..cfb1c011 100644 --- a/cloudinit/cloud.py +++ b/cloudinit/cloud.py @@ -1,368 +1,437 @@ -import os +from time import time import cPickle as pickle +import contextlib +import os +import sys +import weakref -class CloudInit(object): - cfg = None - part_handlers = {} - old_conffile = '/etc/ec2-init/ec2-config.cfg' - ds_deps = [DataSource.DEP_FILESYSTEM, DataSource.DEP_NETWORK] - datasource = None - cloud_config_str = '' - datasource_name = '' - - builtin_handlers = [] - - def __init__(self, ds_deps=None, sysconfig=system_config): - self.builtin_handlers = [ - ['text/x-shellscript', self.handle_user_script, per_always], - ['text/cloud-config', self.handle_cloud_config, per_always], - ['text/upstart-job', self.handle_upstart_job, per_instance], - ['text/cloud-boothook', self.handle_cloud_boothook, per_always], - ] - - if ds_deps != None: - self.ds_deps = ds_deps +from cloudinit.settings import (PER_INSTANCE, PER_ALWAYS, + OLD_CLOUD_CONFIG, CLOUD_CONFIG, + CFG_BUILTIN, CUR_INSTANCE_LINK) +from cloudinit import (get_builtin_cfg, get_base_cfg) +from cloudinit import log as logging +from cloudinit import parts +from cloudinit import sources +from cloudinit import util +from cloudinit import user_data - self.sysconfig = sysconfig +LOG = logging.getLogger(__name__) - self.cfg = self.read_cfg() - def read_cfg(self): - if self.cfg: - return(self.cfg) +class CloudSemaphores(object): + def __init__(self, paths): + self.paths = paths - try: - conf = util.get_base_cfg(self.sysconfig, cfg_builtin, parsed_cfgs) - except Exception: - conf = get_builtin_cfg() + # acquire lock on 'name' for given 'freq' and run function 'func' + # if 'clear_on_fail' is True and 'func' throws an exception + # then remove the lock (so it would run again) + def run_functor(self, name, freq, functor, args=None, clear_on_fail=False): + if not args: + args = [] + if self.has_run(name, freq): + LOG.debug("%s already ran %s", name, freq) + return False + with self.lock(name, freq, clear_on_fail) as lock: + if not lock: + raise RuntimeError("Failed to acquire lock on %s" % name) + else: + LOG.debug("Running %s with args %s using lock %s", func, args, lock) + func(*args) + return True - # support reading the old ConfigObj format file and merging - # it into the yaml dictionary + @contextlib.contextmanager + def lock(self, name, freq, clear_on_fail=False): try: - from configobj import ConfigObj - oldcfg = ConfigObj(self.old_conffile) - if oldcfg is None: - oldcfg = {} - conf = util.mergedict(conf, oldcfg) + yield self._acquire(name, freq) except: - pass - - return(conf) + if clear_on_fail: + self.clear(name, freq) + raise - def restore_from_cache(self): + def clear(self, name, freq): + sem_file = self._getpath(name, freq) try: - # we try to restore from a current link and static path - # by using the instance link, if purge_cache was called - # the file wont exist - cache = get_ipath_cur('obj_pkl') - f = open(cache, "rb") - data = cPickle.load(f) - f.close() - self.datasource = data - return True - except: + util.del_file(sem_file) + except IOError: return False + return True - def write_to_cache(self): - cache = self.get_ipath("obj_pkl") - try: - os.makedirs(os.path.dirname(cache)) - except OSError as e: - if e.errno != errno.EEXIST: - return False - + def _acquire(self, name, freq): + if self.has_run(name, freq): + return None + # This is a race condition since nothing atomic is happening + # here, but this should be ok due to the nature of when + # and where cloud-init runs... (file writing is not a lock..) + sem_file = self._getpath(name, freq) + contents = "%s\n" % str(time()) try: - f = open(cache, "wb") - cPickle.dump(self.datasource, f) - f.close() - os.chmod(cache, 0400) - except: - raise + util.write_file(sem_file, contents) + except (IOError, OSError): + return None + return sem_file - def get_data_source(self): - if self.datasource is not None: + def has_run(self, name, freq): + if freq == PER_ALWAYS: + return False + sem_file = self._get_path(name, freq) + if os.path.exists(sem_file): return True + return False - if self.restore_from_cache(): - log.debug("restored from cache type %s" % self.datasource) - return True + def _get_path(self, name, freq): + sem_path = self.init.get_ipath("sem") + if freq == PER_INSTANCE: + return os.path.join(sem_path, name) + return os.path.join(sem_path, "%s.%s" % (name, freq)) + + +class CloudPaths(object): + def __init__(self, init): + self.config = CLOUD_CONFIG + self.old_config = OLD_CLOUD_CONFIG + self.var_dir = VAR_LIB_DIR + self.instance_link = CUR_INSTANCE_LINK + self.init = weakref.proxy(init) + self.upstart_conf_d = "/etc/init" + + def _get_path_key(self, name): + return PATH_MAP.get(name) + + # get_ipath_cur: get the current instance path for an item + def get_ipath_cur(self, name=None): + add_on = self._get_path_key(name) + ipath = os.path.join(self.var_dir, 'instance') + if add_on: + ipath = os.path.join(ipath, add_on) + return ipath + + # get_cpath : get the "clouddir" (/var/lib/cloud/<name>) + # for a name in dirmap + def get_cpath(self, name=None): + cpath = self.var_dir + add_on = self._get_path_key(name) + if add_on: + cpath = os.path.join(cpath, add_on) + return cpath - cfglist = self.cfg['datasource_list'] - dslist = list_sources(cfglist, self.ds_deps) - dsnames = [f.__name__ for f in dslist] + # get_ipath : get the instance path for a name in pathmap + # (/var/lib/cloud/instances/<instance>/<name>) + def get_ipath(self, name=None): + iid = self.init.datasource.get_instance_id() + ipath = os.path.join(self.var_dir, 'instances', iid) + add_on = self._get_path_key(name) + if add_on: + ipath = os.path.join(ipath, add_on) + return ipath - log.debug("searching for data source in %s" % dsnames) - for cls in dslist: - ds = cls.__name__ - try: - s = cls(sys_cfg=self.cfg) - if s.get_data(): - self.datasource = s - self.datasource_name = ds - log.debug("found data source %s" % ds) - return True - except Exception as e: - log.warn("get_data of %s raised %s" % (ds, e)) - util.logexc(log) - msg = "Did not find data source. searched classes: %s" % dsnames - log.debug(msg) - raise DataSourceNotFoundException(msg) - def set_cur_instance(self): - try: - os.unlink(cur_instance_link) - except OSError as e: - if e.errno != errno.ENOENT: - raise - - iid = self.get_instance_id() - os.symlink("./instances/%s" % iid, cur_instance_link) - idir = self.get_ipath() - dlist = [] - for d in ["handlers", "scripts", "sem"]: - dlist.append("%s/%s" % (idir, d)) +class CloudPartData(object): + def __init__(self, datasource, paths): + self.datasource = datasource + self.paths = paths - util.ensure_dirs(dlist) + def get_userdata(self): + return self.datasource.get_userdata() - ds = "%s: %s\n" % (self.datasource.__class__, str(self.datasource)) - dp = self.get_cpath('data') - util.write_file("%s/%s" % (idir, 'datasource'), ds) - util.write_file("%s/%s" % (dp, 'previous-datasource'), ds) - util.write_file("%s/%s" % (dp, 'previous-instance-id'), "%s\n" % iid) + def get_public_ssh_keys(self): + return self.datasource.get_public_ssh_keys() - def get_userdata(self): - return(self.datasource.get_userdata()) + def get_locale(self): + return self.datasource.get_locale() - def get_userdata_raw(self): - return(self.datasource.get_userdata_raw()) + def get_mirror(self): + return self.datasource.get_local_mirror() - def get_instance_id(self): - return(self.datasource.get_instance_id()) + def get_hostname(self, fqdn=False): + return self.datasource.get_hostname(fqdn=fqdn) - def update_cache(self): - self.write_to_cache() - self.store_userdata() + def device_name_to_device(self, name): + return self.datasource.device_name_to_device(name) - def store_userdata(self): - util.write_file(self.get_ipath('userdata_raw'), - self.datasource.get_userdata_raw(), 0600) - util.write_file(self.get_ipath('userdata'), - self.datasource.get_userdata(), 0600) - - def sem_getpath(self, name, freq): - if freq == 'once-per-instance': - return("%s/%s" % (self.get_ipath("sem"), name)) - return("%s/%s.%s" % (get_cpath("sem"), name, freq)) - - def sem_has_run(self, name, freq): - if freq == per_always: - return False - semfile = self.sem_getpath(name, freq) - if os.path.exists(semfile): - return True - return False + def get_ipath_cur(self, name=None): + return self.paths.get_ipath_cur(name) - def sem_acquire(self, name, freq): - from time import time - semfile = self.sem_getpath(name, freq) + def get_cpath(self, name=None): + return self.paths.get_cpath(name) - try: - os.makedirs(os.path.dirname(semfile)) - except OSError as e: - if e.errno != errno.EEXIST: - raise e + def get_ipath(self, name=None): + return self.paths.get_ipath(name) - if os.path.exists(semfile) and freq != per_always: - return False - # race condition - try: - f = open(semfile, "w") - f.write("%s\n" % str(time())) - f.close() - except: - return(False) - return(True) +class CloudInit(object): + def __init__(self, ds_deps=None): + self.datasource = None + if ds_deps: + self.ds_deps = ds_deps + else: + self.ds_deps = [sources.DEP_FILESYSTEM, sources.DEP_NETWORK] + self.paths = CloudPaths(self) + self.sems = CloudSemaphores(self.paths) + self.cfg = self._read_cfg() - def sem_clear(self, name, freq): - semfile = self.sem_getpath(name, freq) + def _read_cfg_old(self): + # support reading the old ConfigObj format file and merging + # it into the yaml dictionary try: - os.unlink(semfile) - except OSError as e: - if e.errno != errno.ENOENT: - return False + from configobj import ConfigObj + except ImportError: + ConfigObj = None + if not ConfigObj: + return {} + old_cfg = ConfigObj(self.paths.old_config_fn) + return dict(old_cfg) - return True + def read_cfg(self): + if not self.cfg: + self.cfg = self._read_cfg() + return self.cfg - # acquire lock on 'name' for given 'freq' - # if that does not exist, then call 'func' with given 'args' - # if 'clear_on_fail' is True and func throws an exception - # then remove the lock (so it would run again) - def sem_and_run(self, semname, freq, func, args=None, clear_on_fail=False): - if args is None: - args = [] - if self.sem_has_run(semname, freq): - log.debug("%s already ran %s", semname, freq) + def _read_cfg(self): + starting_config = get_builtin_cfg() + try: + conf = get_base_cfg(self.paths.config, starting_config) + except Exception: + conf = starting_config + old_conf = self._read_cfg_old() + conf = util.mergedict(conf, old_conf) + return conf + + def restore_from_cache(self): + pickled_fn = self.paths.get_ipath_cur('obj_pkl') + try: + # we try to restore from a current link and static path + # by using the instance link, if purge_cache was called + # the file wont exist + self.datasource = pickle.loads(util.load_file(pickled_fn)) + return True + except Exception as e: + LOG.debug("Failed loading pickled datasource from %s due to %s", pickled_fn, e) return False + + def write_to_cache(self): + pickled_fn = self.paths.get_ipath_cur("obj_pkl") try: - if not self.sem_acquire(semname, freq): - raise Exception("Failed to acquire lock on %s" % semname) - - func(*args) - except: - if clear_on_fail: - self.sem_clear(semname, freq) - raise - + contents = pickle.dumps(self.datasource) + util.write_file(pickled_fn, contents, mode=0400) + except Exception as e: + LOG.debug("Failed pickling datasource to %s due to %s", pickled_fn, e) + return False + + def get_data_source(self): + if self.datasource: + return True + if self.restore_from_cache(): + LOG.debug("Restored from cache datasource: %s" % self.datasource) + return True + (ds, dsname) = sources.find_source(self.cfg, self.ds_deps) + LOG.debug("Loaded datasource %s:%s", dsname, ds) + self.datasource = ds return True + + def set_cur_instance(self): + # Ensure we are hooked into the right symlink for the current instance + idir = self.paths.get_ipath() + util.del_file(self.paths.instance_link) + util.sym_link(idir, self.paths.instance_link) - # get_ipath : get the instance path for a name in pathmap - # (/var/lib/cloud/instances/<instance>/name)<name>) - def get_ipath(self, name=None): - return("%s/instances/%s%s" - % (varlibdir, self.get_instance_id(), pathmap[name])) + dlist = [] + for d in ["handlers", "scripts", "sem"]: + dlist.append(os.path.join(idir, d)) + util.ensure_dirs(dlist) - def consume_userdata(self, frequency=per_instance): - self.get_userdata() - data = self + # Write out information on what is being used for the current instance + # and what may have been used for a previous instance... + dp = self.paths.get_cpath('data') + ds = "%s: %s\n" % (self.datasource.__class__, self.datasource) + previous_ds = '' + ds_fn = os.path.join(idir, 'datasource') + try: + previous_ds = util.load_file(ds_fn).strip() + except IOError as e: + pass + if not previous_ds: + # TODO: ?? is this right + previous_ds = ds + util.write_file(ds_fn, ds) + util.write_file(os.path.join(dp, 'previous-datasource'), previous_ds) + iid = self.datasource.get_instance_id() + previous_iid = '' + p_iid_fn = os.path.join(dp, 'previous-instance-id') + try: + previous_iid = util.load_file(p_iid_fn).strip() + except IOError as e: + pass + if not previous_iid: + # TODO: ?? is this right + previous_iid = iid + util.write_file(p_iid_fn, "%s\n" % previous_iid) - cdir = get_cpath("handlers") - idir = self.get_ipath("handlers") + def update_cache(self): + self.write_to_cache() + self.store_userdata() - # add the path to the plugins dir to the top of our list for import + def store_userdata(self): + raw_ud = "%s" % (self.datasource.get_userdata_raw()) + util.write_file(self.paths.get_ipath('userdata_raw'), raw_ud, 0600) + ud = "%s" % (self.datasource.get_userdata()) + util.write_file(self.paths.get_ipath('userdata'), ud, 0600) + + def consume_userdata(self, frequency=PER_INSTANCE): + cdir = self.paths.get_cpath("handlers") + idir = self.paths.get_ipath("handlers") + + # Add the path to the plugins dir to the top of our list for import # instance dir should be read before cloud-dir sys.path.insert(0, cdir) sys.path.insert(0, idir) - part_handlers = {} - # add handlers in cdir - for fname in glob.glob("%s/*.py" % cdir): + # Data will be a little proxy that modules can use + data = CloudPartData(self.datasource, self.paths) + + # This keeps track of all the active handlers + handlers = CloudHandlers(self) + + # Add handlers in cdir + for fname in glob.glob(os.path.join(cdir, "*.py")): if not os.path.isfile(fname): continue modname = os.path.basename(fname)[0:-3] try: - mod = __import__(modname) - handler_register(mod, part_handlers, data, frequency) - log.debug("added handler for [%s] from %s" % (mod.list_types(), - fname)) + mod = parts.fixup_module(importer.import_module(modname)) + types = handlers.register(mod) + LOG.debug("Added handler for [%s] from %s", types, fname) except: - log.warn("failed to initialize handler in %s" % fname) - util.logexc(log) - - # add the internal handers if their type hasn't been already claimed - for (btype, bhand, bfreq) in self.builtin_handlers: - if btype in part_handlers: - continue - handler_register(InternalPartHandler(bhand, [btype], bfreq), - part_handlers, data, frequency) + LOG.exception("Failed to register handler in %s", fname) - # walk the data - pdata = {'handlers': part_handlers, 'handlerdir': idir, - 'data': data, 'frequency': frequency} - UserDataHandler.walk_userdata(self.get_userdata(), - partwalker_callback, data=pdata) + def_handlers = handlers.register_defaults() + if def_handlers: + LOG.debug("Registered default handlers for [%s]", def_handlers) - # give callbacks opportunity to finalize + # Init the handlers first + # Ensure userdata fetched before activation called = [] - for (_mtype, mod) in part_handlers.iteritems(): + for (_mtype, mod) in handlers.iteritems(): if mod in called: continue - handler_call_end(mod, data, frequency) - - def handle_user_script(self, _data, ctype, filename, payload, _frequency): - if ctype == "__end__": - return - if ctype == "__begin__": - # maybe delete existing things here - return - - filename = filename.replace(os.sep, '_') - scriptsdir = get_ipath_cur('scripts') - util.write_file("%s/%s" % - (scriptsdir, filename), util.dos2unix(payload), 0700) - - def handle_upstart_job(self, _data, ctype, filename, payload, frequency): - # upstart jobs are only written on the first boot - if frequency != per_instance: - return - - if ctype == "__end__" or ctype == "__begin__": - return - if not filename.endswith(".conf"): - filename = filename + ".conf" - - util.write_file("%s/%s" % ("/etc/init", filename), - util.dos2unix(payload), 0644) - - def handle_cloud_config(self, _data, ctype, filename, payload, _frequency): - if ctype == "__begin__": - self.cloud_config_str = "" - return - if ctype == "__end__": - cloud_config = self.get_ipath("cloud_config") - util.write_file(cloud_config, self.cloud_config_str, 0600) - - ## this could merge the cloud config with the system config - ## for now, not doing this as it seems somewhat circular - ## as CloudConfig does that also, merging it with this cfg - ## - # ccfg = yaml.load(self.cloud_config_str) - # if ccfg is None: ccfg = {} - # self.cfg = util.mergedict(ccfg, self.cfg) - - return - - self.cloud_config_str += "\n#%s\n%s" % (filename, payload) - - def handle_cloud_boothook(self, _data, ctype, filename, payload, - _frequency): - if ctype == "__end__": - return - if ctype == "__begin__": - return - - filename = filename.replace(os.sep, '_') - payload = util.dos2unix(payload) - prefix = "#cloud-boothook" - start = 0 - if payload.startswith(prefix): - start = len(prefix) + 1 - - boothooks_dir = self.get_ipath("boothooks") - filepath = "%s/%s" % (boothooks_dir, filename) - util.write_file(filepath, payload[start:], 0700) - try: - env = os.environ.copy() - env['INSTANCE_ID'] = self.datasource.get_instance_id() - subprocess.check_call([filepath], env=env) - except subprocess.CalledProcessError as e: - log.error("boothooks script %s returned %i" % - (filepath, e.returncode)) - except Exception as e: - log.error("boothooks unknown exception %s when running %s" % - (e, filepath)) + parts.call_begin(mod, data, frequency) + called.append(mod) + + # Walk the user data + part_data = { + 'handlers': handlers, + 'handlerdir': idir, + 'data': data, + 'frequency': frequency, + 'handlercount': 0, + } + user_data.walk(data.get_userdata(), parts.walker_callback, data=part_data) + + # Give callbacks opportunity to finalize + called = [] + for (_mtype, mod) in handlers.iteritems(): + if mod in called: + continue + parts.call_end(mod, data, frequency) + called.append(mod) - def get_public_ssh_keys(self): - return(self.datasource.get_public_ssh_keys()) - def get_locale(self): - return(self.datasource.get_locale()) +class CloudHandlers(object): - def get_mirror(self): - return(self.datasource.get_local_mirror()) + def __init__(self, paths): + self.paths = paths + self.registered = {} - def get_hostname(self, fqdn=False): - return(self.datasource.get_hostname(fqdn=fqdn)) + def __contains__(self, item): + return self.is_registered(item) - def device_name_to_device(self, name): - return(self.datasource.device_name_to_device(name)) + def __getitem__(self, key): + return self._get_handler(key) - # I really don't know if this should be here or not, but - # I needed it in cc_update_hostname, where that code had a valid 'cloud' - # reference, but did not have a cloudinit handle - # (ie, no cloudinit.get_cpath()) - def get_cpath(self, name=None): - return(get_cpath(name)) + def is_registered(self, content_type): + return content_type in self.registered + + def register(self, mod): + types = set() + for t in mod.list_types(): + self.registered[t] = handler + types.add(t) + return types + + def _get_handler(self, content_type): + return self.registered[content_type] + + def items(self): + return self.registered.items() + + def iteritems(self): + return self.registered.iteritems() + def _get_default_handlers(self): + def_handlers = [] + if self.paths.get_ipath("cloud_config"): + def_handlers.append(parts.CloudConfigPartHandler(self.paths.get_ipath("cloud_config"))) + if self.paths.get_ipath_cur('scripts'): + def_handlers.append(parts.ShellScriptPartHandler(self.paths.get_ipath_cur('scripts'))) + if self.paths.get_ipath("boothooks"): + def_handlers.append(parts.BootHookPartHandler(self.paths.get_ipath("boothooks"))) + if self.paths.upstart_conf_d: + def_handlers.append(parts.UpstartJobPartHandler(self.paths.upstart_conf_d)) + return def_handlers + + def register_defaults(self): + registered = set() + for h in self._get_default_handlers(): + for t in h.list_types(): + if not self.is_registered(t) + self.register_handler(t, h) + registered.add(t) + return registered + + +class CloudConfig(object): + cfgfile = None + cfg = None + + def __init__(self, cfgfile, cloud=None, ds_deps=None): + if cloud == None: + self.cloud = cloudinit.CloudInit(ds_deps) + self.cloud.get_data_source() + else: + self.cloud = cloud + self.cfg = self.get_config_obj(cfgfile) + + def get_config_obj(self, cfgfile): + try: + cfg = util.read_conf(cfgfile) + except: + # TODO: this 'log' could/should be passed in + cloudinit.log.critical("Failed loading of cloud config '%s'. " + "Continuing with empty config\n" % cfgfile) + cloudinit.log.debug(traceback.format_exc() + "\n") + cfg = None + if cfg is None: + cfg = {} + + try: + ds_cfg = self.cloud.datasource.get_config_obj() + except: + ds_cfg = {} + + cfg = util.mergedict(cfg, ds_cfg) + return(util.mergedict(cfg, self.cloud.cfg)) + + def handle(self, name, args, freq=None): + try: + mod = __import__("cc_" + name.replace("-", "_"), globals()) + def_freq = getattr(mod, "frequency", per_instance) + handler = getattr(mod, "handle") + + if not freq: + freq = def_freq + + self.cloud.sem_and_run("config-" + name, freq, handler, + [name, self.cfg, self.cloud, cloudinit.log, args]) + except: + raise |