from time import time import cPickle as pickle import contextlib import os import sys import weakref from cloudinit.settings import (PER_INSTANCE, PER_ALWAYS, OLD_CLOUD_CONFIG, CLOUD_CONFIG, CFG_BUILTIN, CUR_INSTANCE_LINK) from cloudinit import (get_builtin_cfg, get_base_cfg) from cloudinit import log as logging from cloudinit import parts from cloudinit import sources from cloudinit import util from cloudinit import user_data from cloudinit import handlers LOG = logging.getLogger(__name__) class CloudSemaphores(object): def __init__(self, paths): self.paths = paths # acquire lock on 'name' for given 'freq' and run function 'func' # if 'clear_on_fail' is True and 'func' throws an exception # then remove the lock (so it would run again) def run_functor(self, name, freq, functor, args=None, clear_on_fail=False): if not args: args = [] if self.has_run(name, freq): LOG.debug("%s already ran %s", name, freq) return False with self.lock(name, freq, clear_on_fail) as lock: if not lock: raise RuntimeError("Failed to acquire lock on %s" % name) else: LOG.debug("Running %s with args %s using lock %s", func, args, lock) func(*args) return True @contextlib.contextmanager def lock(self, name, freq, clear_on_fail=False): try: yield self._acquire(name, freq) except: if clear_on_fail: self.clear(name, freq) raise def clear(self, name, freq): sem_file = self._getpath(name, freq) try: util.del_file(sem_file) except IOError: return False return True def _acquire(self, name, freq): if self.has_run(name, freq): return None # This is a race condition since nothing atomic is happening # here, but this should be ok due to the nature of when # and where cloud-init runs... (file writing is not a lock..) sem_file = self._getpath(name, freq) contents = "%s\n" % str(time()) try: util.write_file(sem_file, contents) except (IOError, OSError): return None return sem_file def has_run(self, name, freq): if freq == PER_ALWAYS: return False sem_file = self._get_path(name, freq) if os.path.exists(sem_file): return True return False def _get_path(self, name, freq): sem_path = self.init.get_ipath("sem") if freq == PER_INSTANCE: return os.path.join(sem_path, name) return os.path.join(sem_path, "%s.%s" % (name, freq)) class CloudPaths(object): def __init__(self, init): self.config = CLOUD_CONFIG self.old_config = OLD_CLOUD_CONFIG self.var_dir = VAR_LIB_DIR self.instance_link = CUR_INSTANCE_LINK self.init = weakref.proxy(init) self.upstart_conf_d = "/etc/init" def _get_path_key(self, name): return PATH_MAP.get(name) # get_ipath_cur: get the current instance path for an item def get_ipath_cur(self, name=None): add_on = self._get_path_key(name) ipath = os.path.join(self.var_dir, 'instance') if add_on: ipath = os.path.join(ipath, add_on) return ipath # get_cpath : get the "clouddir" (/var/lib/cloud/) # for a name in dirmap def get_cpath(self, name=None): cpath = self.var_dir add_on = self._get_path_key(name) if add_on: cpath = os.path.join(cpath, add_on) return cpath # get_ipath : get the instance path for a name in pathmap # (/var/lib/cloud/instances//) def get_ipath(self, name=None): iid = self.init.datasource.get_instance_id() ipath = os.path.join(self.var_dir, 'instances', iid) add_on = self._get_path_key(name) if add_on: ipath = os.path.join(ipath, add_on) return ipath class CloudPartData(object): def __init__(self, datasource, paths): self.datasource = datasource self.paths = paths def get_userdata(self): return self.datasource.get_userdata() def get_public_ssh_keys(self): return self.datasource.get_public_ssh_keys() def get_locale(self): return self.datasource.get_locale() def get_mirror(self): return self.datasource.get_local_mirror() def get_hostname(self, fqdn=False): return self.datasource.get_hostname(fqdn=fqdn) def device_name_to_device(self, name): return self.datasource.device_name_to_device(name) def get_ipath_cur(self, name=None): return self.paths.get_ipath_cur(name) def get_cpath(self, name=None): return self.paths.get_cpath(name) def get_ipath(self, name=None): return self.paths.get_ipath(name) class CloudInit(object): def __init__(self, ds_deps=None): self.datasource = None if ds_deps: self.ds_deps = ds_deps else: self.ds_deps = [sources.DEP_FILESYSTEM, sources.DEP_NETWORK] self.paths = CloudPaths(self) self.cfg = self._read_cfg() def _read_cfg_old(self): # support reading the old ConfigObj format file and merging # it into the yaml dictionary try: from configobj import ConfigObj except ImportError: ConfigObj = None if not ConfigObj: return {} old_cfg = ConfigObj(self.paths.old_config_fn) return dict(old_cfg) def read_cfg(self): if not self.cfg: self.cfg = self._read_cfg() return self.cfg def _read_cfg(self): starting_config = get_builtin_cfg() try: conf = get_base_cfg(self.paths.config, starting_config) except Exception: conf = starting_config old_conf = self._read_cfg_old() conf = util.mergedict(conf, old_conf) return conf def restore_from_cache(self): pickled_fn = self.paths.get_ipath_cur('obj_pkl') try: # we try to restore from a current link and static path # by using the instance link, if purge_cache was called # the file wont exist self.datasource = pickle.loads(util.load_file(pickled_fn)) return True except Exception as e: LOG.debug("Failed loading pickled datasource from %s due to %s", pickled_fn, e) return False def write_to_cache(self): pickled_fn = self.paths.get_ipath_cur("obj_pkl") try: contents = pickle.dumps(self.datasource) util.write_file(pickled_fn, contents, mode=0400) except Exception as e: LOG.debug("Failed pickling datasource to %s due to %s", pickled_fn, e) return False def get_data_source(self): if self.datasource: return True if self.restore_from_cache(): LOG.debug("Restored from cache datasource: %s" % self.datasource) return True (ds, dsname) = sources.find_source(self.cfg, self.ds_deps) LOG.debug("Loaded datasource %s:%s", dsname, ds) self.datasource = ds return True def set_cur_instance(self): # Ensure we are hooked into the right symlink for the current instance idir = self.paths.get_ipath() util.del_file(self.paths.instance_link) util.sym_link(idir, self.paths.instance_link) dlist = [] for d in ["handlers", "scripts", "sem"]: dlist.append(os.path.join(idir, d)) util.ensure_dirs(dlist) # Write out information on what is being used for the current instance # and what may have been used for a previous instance... dp = self.paths.get_cpath('data') ds = "%s: %s\n" % (self.datasource.__class__, self.datasource) previous_ds = '' ds_fn = os.path.join(idir, 'datasource') try: previous_ds = util.load_file(ds_fn).strip() except IOError as e: pass if not previous_ds: # TODO: ?? is this right previous_ds = ds util.write_file(ds_fn, ds) util.write_file(os.path.join(dp, 'previous-datasource'), previous_ds) iid = self.datasource.get_instance_id() previous_iid = '' p_iid_fn = os.path.join(dp, 'previous-instance-id') try: previous_iid = util.load_file(p_iid_fn).strip() except IOError as e: pass if not previous_iid: # TODO: ?? is this right previous_iid = iid util.write_file(p_iid_fn, "%s\n" % previous_iid) def update_cache(self): self.write_to_cache() self.store_userdata() def store_userdata(self): raw_ud = "%s" % (self.datasource.get_userdata_raw()) util.write_file(self.paths.get_ipath('userdata_raw'), raw_ud, 0600) ud = "%s" % (self.datasource.get_userdata()) util.write_file(self.paths.get_ipath('userdata'), ud, 0600) def consume_userdata(self, frequency=PER_INSTANCE): cdir = self.paths.get_cpath("handlers") idir = self.paths.get_ipath("handlers") # Add the path to the plugins dir to the top of our list for import # instance dir should be read before cloud-dir sys.path.insert(0, cdir) sys.path.insert(0, idir) # Data will be a little proxy that modules can use data = CloudPartData(self.datasource, self.paths) # This keeps track of all the active handlers handlers = CloudHandlers(self) # Add handlers in cdir potential_handlers = parts.find_module_files(cdir) for (fname, modname) in potential_handlers.iteritems(): try: mod = parts.fixup_module(importer.import_module(modname)) types = handlers.register(mod) LOG.debug("Added handler for [%s] from %s", types, fname) except: LOG.exception("Failed to register handler from %s", fname) def_handlers = handlers.register_defaults() if def_handlers: LOG.debug("Registered default handlers for [%s]", def_handlers) # Init the handlers first # Ensure userdata fetched before activation called = [] for (_mtype, mod) in handlers.iteritems(): if mod in called: continue parts.call_begin(mod, data, frequency) called.append(mod) # Walk the user data part_data = { 'handlers': handlers, 'handlerdir': idir, 'data': data, 'frequency': frequency, 'handlercount': 0, } user_data.walk(data.get_userdata(), parts.walker_callback, data=part_data) # Give callbacks opportunity to finalize called = [] for (_mtype, mod) in handlers.iteritems(): if mod in called: continue parts.call_end(mod, data, frequency) called.append(mod) class CloudHandlers(object): def __init__(self, paths): self.paths = paths self.registered = {} def __contains__(self, item): return self.is_registered(item) def __getitem__(self, key): return self._get_handler(key) def is_registered(self, content_type): return content_type in self.registered def register(self, mod): types = set() for t in mod.list_types(): self.registered[t] = handler types.add(t) return types def _get_handler(self, content_type): return self.registered[content_type] def items(self): return self.registered.items() def iteritems(self): return self.registered.iteritems() def _get_default_handlers(self): def_handlers = [] if self.paths.get_ipath("cloud_config"): def_handlers.append(parts.CloudConfigPartHandler(self.paths.get_ipath("cloud_config"))) if self.paths.get_ipath_cur('scripts'): def_handlers.append(parts.ShellScriptPartHandler(self.paths.get_ipath_cur('scripts'))) if self.paths.get_ipath("boothooks"): def_handlers.append(parts.BootHookPartHandler(self.paths.get_ipath("boothooks"))) if self.paths.upstart_conf_d: def_handlers.append(parts.UpstartJobPartHandler(self.paths.upstart_conf_d)) return def_handlers def register_defaults(self): registered = set() for h in self._get_default_handlers(): for t in h.list_types(): if not self.is_registered(t) self.register_handler(t, h) registered.add(t) return registered class CloudConfig(object): def __init__(self, cfgfile, cloud): self.cloud = cloud self.cfg = self._get_config(cfgfile) self.paths = cloud.paths self.sems = CloudSemaphores(self.paths) def _get_config(self, cfgfile): cfg = None try: cfg = util.read_conf(cfgfile) except: LOG.exception(("Failed loading of cloud config '%s'. " "Continuing with empty config."), cfgfile) if not cfg: cfg = {} ds_cfg = None try: ds_cfg = self.cloud.datasource.get_config_obj() except: LOG.exception("Failed loading of datasource config.") if not ds_cfg: ds_cfg = {} cfg = util.mergedict(cfg, ds_cfg) cloud_cfg = self.cloud.cfg or {} return util.mergedict(cfg, cloud_cfg) def extract(self, name): modname = handlers.form_module_name(name) if not modname: return None return handlers.fixup_module(importer.import_module(modname)) def handle(self, name, mod, args, freq=None): def_freq = mod.frequency if not freq: freq = def_freq c_name = "config-%s" % (name) real_args = [name, self.cfg, self.cloud, LOG, args] return self.sems.run_functor(c_name, freq, mod.handle, real_args)