diff options
| -rw-r--r-- | cloudinit/cloud.py | 675 | 
1 files changed, 372 insertions, 303 deletions
| diff --git a/cloudinit/cloud.py b/cloudinit/cloud.py index eb71439b..cfb1c011 100644 --- a/cloudinit/cloud.py +++ b/cloudinit/cloud.py @@ -1,368 +1,437 @@ -import os +from time import time  import cPickle as pickle +import contextlib +import os +import sys +import weakref -class CloudInit(object): -    cfg = None -    part_handlers = {} -    old_conffile = '/etc/ec2-init/ec2-config.cfg' -    ds_deps = [DataSource.DEP_FILESYSTEM, DataSource.DEP_NETWORK] -    datasource = None -    cloud_config_str = '' -    datasource_name = '' - -    builtin_handlers = [] - -    def __init__(self, ds_deps=None, sysconfig=system_config): -        self.builtin_handlers = [ -            ['text/x-shellscript', self.handle_user_script, per_always], -            ['text/cloud-config', self.handle_cloud_config, per_always], -            ['text/upstart-job', self.handle_upstart_job, per_instance], -            ['text/cloud-boothook', self.handle_cloud_boothook, per_always], -        ] - -        if ds_deps != None: -            self.ds_deps = ds_deps +from cloudinit.settings import (PER_INSTANCE, PER_ALWAYS, +                                OLD_CLOUD_CONFIG, CLOUD_CONFIG, +                                CFG_BUILTIN, CUR_INSTANCE_LINK) +from cloudinit import (get_builtin_cfg, get_base_cfg) +from cloudinit import log as logging +from cloudinit import parts +from cloudinit import sources +from cloudinit import util +from cloudinit import user_data -        self.sysconfig = sysconfig +LOG = logging.getLogger(__name__) -        self.cfg = self.read_cfg() -    def read_cfg(self): -        if self.cfg: -            return(self.cfg) +class CloudSemaphores(object): +    def __init__(self, paths): +        self.paths = paths -        try: -            conf = util.get_base_cfg(self.sysconfig, cfg_builtin, parsed_cfgs) -        except Exception: -            conf = get_builtin_cfg() +    # acquire lock on 'name' for given 'freq' and run function 'func' +    # if 'clear_on_fail' is True and 'func' throws an exception +    # then remove the lock (so it would run again) +    def run_functor(self, name, freq, functor, args=None, clear_on_fail=False): +        if not args: +            args = [] +        if self.has_run(name, freq): +            LOG.debug("%s already ran %s", name, freq) +            return False +        with self.lock(name, freq, clear_on_fail) as lock: +            if not lock: +                raise RuntimeError("Failed to acquire lock on %s" % name) +            else: +                LOG.debug("Running %s with args %s using lock %s", func, args, lock) +                func(*args) +        return True -        # support reading the old ConfigObj format file and merging -        # it into the yaml dictionary +    @contextlib.contextmanager +    def lock(self, name, freq, clear_on_fail=False):          try: -            from configobj import ConfigObj -            oldcfg = ConfigObj(self.old_conffile) -            if oldcfg is None: -                oldcfg = {} -            conf = util.mergedict(conf, oldcfg) +            yield self._acquire(name, freq)          except: -            pass - -        return(conf) +            if clear_on_fail: +                self.clear(name, freq) +            raise -    def restore_from_cache(self): +    def clear(self, name, freq): +        sem_file = self._getpath(name, freq)          try: -            # we try to restore from a current link and static path -            # by using the instance link, if purge_cache was called -            # the file wont exist -            cache = get_ipath_cur('obj_pkl') -            f = open(cache, "rb") -            data = cPickle.load(f) -            f.close() -            self.datasource = data -            return True -        except: +            util.del_file(sem_file) +        except IOError:              return False +        return True -    def write_to_cache(self): -        cache = self.get_ipath("obj_pkl") -        try: -            os.makedirs(os.path.dirname(cache)) -        except OSError as e: -            if e.errno != errno.EEXIST: -                return False - +    def _acquire(self, name, freq): +        if self.has_run(name, freq): +            return None +        # This is a race condition since nothing atomic is happening +        # here, but this should be ok due to the nature of when +        # and where cloud-init runs... (file writing is not a lock..) +        sem_file = self._getpath(name, freq) +        contents = "%s\n" % str(time())          try: -            f = open(cache, "wb") -            cPickle.dump(self.datasource, f) -            f.close() -            os.chmod(cache, 0400) -        except: -            raise +            util.write_file(sem_file, contents) +        except (IOError, OSError): +            return None +        return sem_file -    def get_data_source(self): -        if self.datasource is not None: +    def has_run(self, name, freq): +        if freq == PER_ALWAYS: +            return False +        sem_file = self._get_path(name, freq) +        if os.path.exists(sem_file):              return True +        return False -        if self.restore_from_cache(): -            log.debug("restored from cache type %s" % self.datasource) -            return True +    def _get_path(self, name, freq): +        sem_path = self.init.get_ipath("sem") +        if freq == PER_INSTANCE: +            return os.path.join(sem_path, name) +        return os.path.join(sem_path, "%s.%s" % (name, freq)) + + +class CloudPaths(object): +    def __init__(self, init): +        self.config = CLOUD_CONFIG +        self.old_config = OLD_CLOUD_CONFIG +        self.var_dir = VAR_LIB_DIR +        self.instance_link = CUR_INSTANCE_LINK +        self.init = weakref.proxy(init) +        self.upstart_conf_d = "/etc/init" + +    def _get_path_key(self, name): +        return PATH_MAP.get(name) + +    # get_ipath_cur: get the current instance path for an item +    def get_ipath_cur(self, name=None): +        add_on = self._get_path_key(name) +        ipath = os.path.join(self.var_dir, 'instance') +        if add_on: +            ipath = os.path.join(ipath, add_on) +        return ipath + +    # get_cpath : get the "clouddir" (/var/lib/cloud/<name>) +    # for a name in dirmap +    def get_cpath(self, name=None): +        cpath = self.var_dir +        add_on = self._get_path_key(name) +        if add_on: +            cpath = os.path.join(cpath, add_on) +        return cpath -        cfglist = self.cfg['datasource_list'] -        dslist = list_sources(cfglist, self.ds_deps) -        dsnames = [f.__name__ for f in dslist] +    # get_ipath : get the instance path for a name in pathmap +    # (/var/lib/cloud/instances/<instance>/<name>) +    def get_ipath(self, name=None): +        iid = self.init.datasource.get_instance_id() +        ipath = os.path.join(self.var_dir, 'instances', iid) +        add_on = self._get_path_key(name) +        if add_on: +            ipath = os.path.join(ipath, add_on) +        return ipath -        log.debug("searching for data source in %s" % dsnames) -        for cls in dslist: -            ds = cls.__name__ -            try: -                s = cls(sys_cfg=self.cfg) -                if s.get_data(): -                    self.datasource = s -                    self.datasource_name = ds -                    log.debug("found data source %s" % ds) -                    return True -            except Exception as e: -                log.warn("get_data of %s raised %s" % (ds, e)) -                util.logexc(log) -        msg = "Did not find data source. searched classes: %s" % dsnames -        log.debug(msg) -        raise DataSourceNotFoundException(msg) -    def set_cur_instance(self): -        try: -            os.unlink(cur_instance_link) -        except OSError as e: -            if e.errno != errno.ENOENT: -                raise - -        iid = self.get_instance_id() -        os.symlink("./instances/%s" % iid, cur_instance_link) -        idir = self.get_ipath() -        dlist = [] -        for d in ["handlers", "scripts", "sem"]: -            dlist.append("%s/%s" % (idir, d)) +class CloudPartData(object): +    def __init__(self, datasource, paths): +        self.datasource = datasource +        self.paths = paths -        util.ensure_dirs(dlist) +    def get_userdata(self): +        return self.datasource.get_userdata() -        ds = "%s: %s\n" % (self.datasource.__class__, str(self.datasource)) -        dp = self.get_cpath('data') -        util.write_file("%s/%s" % (idir, 'datasource'), ds) -        util.write_file("%s/%s" % (dp, 'previous-datasource'), ds) -        util.write_file("%s/%s" % (dp, 'previous-instance-id'), "%s\n" % iid) +    def get_public_ssh_keys(self): +        return self.datasource.get_public_ssh_keys() -    def get_userdata(self): -        return(self.datasource.get_userdata()) +    def get_locale(self): +        return self.datasource.get_locale() -    def get_userdata_raw(self): -        return(self.datasource.get_userdata_raw()) +    def get_mirror(self): +        return self.datasource.get_local_mirror() -    def get_instance_id(self): -        return(self.datasource.get_instance_id()) +    def get_hostname(self, fqdn=False): +        return self.datasource.get_hostname(fqdn=fqdn) -    def update_cache(self): -        self.write_to_cache() -        self.store_userdata() +    def device_name_to_device(self, name): +        return self.datasource.device_name_to_device(name) -    def store_userdata(self): -        util.write_file(self.get_ipath('userdata_raw'), -            self.datasource.get_userdata_raw(), 0600) -        util.write_file(self.get_ipath('userdata'), -            self.datasource.get_userdata(), 0600) - -    def sem_getpath(self, name, freq): -        if freq == 'once-per-instance': -            return("%s/%s" % (self.get_ipath("sem"), name)) -        return("%s/%s.%s" % (get_cpath("sem"), name, freq)) - -    def sem_has_run(self, name, freq): -        if freq == per_always: -            return False -        semfile = self.sem_getpath(name, freq) -        if os.path.exists(semfile): -            return True -        return False +    def get_ipath_cur(self, name=None): +        return self.paths.get_ipath_cur(name) -    def sem_acquire(self, name, freq): -        from time import time -        semfile = self.sem_getpath(name, freq) +    def get_cpath(self, name=None): +        return self.paths.get_cpath(name) -        try: -            os.makedirs(os.path.dirname(semfile)) -        except OSError as e: -            if e.errno != errno.EEXIST: -                raise e +    def get_ipath(self, name=None): +        return self.paths.get_ipath(name) -        if os.path.exists(semfile) and freq != per_always: -            return False -        # race condition -        try: -            f = open(semfile, "w") -            f.write("%s\n" % str(time())) -            f.close() -        except: -            return(False) -        return(True) +class CloudInit(object): +    def __init__(self, ds_deps=None): +        self.datasource = None +        if ds_deps: +            self.ds_deps = ds_deps +        else: +            self.ds_deps = [sources.DEP_FILESYSTEM, sources.DEP_NETWORK] +        self.paths = CloudPaths(self) +        self.sems = CloudSemaphores(self.paths) +        self.cfg = self._read_cfg() -    def sem_clear(self, name, freq): -        semfile = self.sem_getpath(name, freq) +    def _read_cfg_old(self): +        # support reading the old ConfigObj format file and merging +        # it into the yaml dictionary          try: -            os.unlink(semfile) -        except OSError as e: -            if e.errno != errno.ENOENT: -                return False +            from configobj import ConfigObj +        except ImportError: +            ConfigObj = None +        if not ConfigObj: +            return {} +        old_cfg = ConfigObj(self.paths.old_config_fn) +        return dict(old_cfg) -        return True +    def read_cfg(self): +        if not self.cfg: +            self.cfg = self._read_cfg() +        return self.cfg -    # acquire lock on 'name' for given 'freq' -    # if that does not exist, then call 'func' with given 'args' -    # if 'clear_on_fail' is True and func throws an exception -    #  then remove the lock (so it would run again) -    def sem_and_run(self, semname, freq, func, args=None, clear_on_fail=False): -        if args is None: -            args = [] -        if self.sem_has_run(semname, freq): -            log.debug("%s already ran %s", semname, freq) +    def _read_cfg(self): +        starting_config = get_builtin_cfg() +        try: +            conf = get_base_cfg(self.paths.config, starting_config) +        except Exception: +            conf = starting_config +        old_conf = self._read_cfg_old() +        conf = util.mergedict(conf, old_conf) +        return conf +     +    def restore_from_cache(self): +        pickled_fn = self.paths.get_ipath_cur('obj_pkl') +        try: +            # we try to restore from a current link and static path +            # by using the instance link, if purge_cache was called +            # the file wont exist +            self.datasource = pickle.loads(util.load_file(pickled_fn)) +            return True +        except Exception as e: +            LOG.debug("Failed loading pickled datasource from %s due to %s", pickled_fn, e)              return False +     +    def write_to_cache(self): +        pickled_fn = self.paths.get_ipath_cur("obj_pkl")          try: -            if not self.sem_acquire(semname, freq): -                raise Exception("Failed to acquire lock on %s" % semname) - -            func(*args) -        except: -            if clear_on_fail: -                self.sem_clear(semname, freq) -            raise - +            contents = pickle.dumps(self.datasource) +            util.write_file(pickled_fn, contents, mode=0400) +        except Exception as e: +            LOG.debug("Failed pickling datasource to %s due to %s", pickled_fn, e) +            return False +     +    def get_data_source(self): +        if self.datasource: +            return True +        if self.restore_from_cache(): +            LOG.debug("Restored from cache datasource: %s" % self.datasource) +            return True +        (ds, dsname) = sources.find_source(self.cfg, self.ds_deps) +        LOG.debug("Loaded datasource %s:%s", dsname, ds) +        self.datasource = ds          return True +     +    def set_cur_instance(self): +        # Ensure we are hooked into the right symlink for the current instance +        idir = self.paths.get_ipath() +        util.del_file(self.paths.instance_link) +        util.sym_link(idir, self.paths.instance_link) -    # get_ipath : get the instance path for a name in pathmap -    # (/var/lib/cloud/instances/<instance>/name)<name>) -    def get_ipath(self, name=None): -        return("%s/instances/%s%s" -               % (varlibdir, self.get_instance_id(), pathmap[name])) +        dlist = [] +        for d in ["handlers", "scripts", "sem"]: +            dlist.append(os.path.join(idir, d)) +        util.ensure_dirs(dlist) -    def consume_userdata(self, frequency=per_instance): -        self.get_userdata() -        data = self +        # Write out information on what is being used for the current instance +        # and what may have been used for a previous instance... +        dp = self.paths.get_cpath('data') +        ds = "%s: %s\n" % (self.datasource.__class__, self.datasource) +        previous_ds = '' +        ds_fn = os.path.join(idir, 'datasource') +        try: +            previous_ds = util.load_file(ds_fn).strip() +        except IOError as e: +            pass +        if not previous_ds: +            # TODO: ?? is this right +            previous_ds = ds +        util.write_file(ds_fn, ds) +        util.write_file(os.path.join(dp, 'previous-datasource'), previous_ds) +        iid = self.datasource.get_instance_id() +        previous_iid = '' +        p_iid_fn = os.path.join(dp, 'previous-instance-id') +        try: +            previous_iid = util.load_file(p_iid_fn).strip() +        except IOError as e: +            pass +        if not previous_iid: +            # TODO: ?? is this right +            previous_iid = iid +        util.write_file(p_iid_fn, "%s\n" % previous_iid) -        cdir = get_cpath("handlers") -        idir = self.get_ipath("handlers") +    def update_cache(self): +        self.write_to_cache() +        self.store_userdata() -        # add the path to the plugins dir to the top of our list for import +    def store_userdata(self): +        raw_ud = "%s" % (self.datasource.get_userdata_raw()) +        util.write_file(self.paths.get_ipath('userdata_raw'), raw_ud, 0600) +        ud = "%s" % (self.datasource.get_userdata()) +        util.write_file(self.paths.get_ipath('userdata'), ud, 0600) + +    def consume_userdata(self, frequency=PER_INSTANCE): +        cdir = self.paths.get_cpath("handlers") +        idir = self.paths.get_ipath("handlers") +     +        # Add the path to the plugins dir to the top of our list for import          # instance dir should be read before cloud-dir          sys.path.insert(0, cdir)          sys.path.insert(0, idir) -        part_handlers = {} -        # add handlers in cdir -        for fname in glob.glob("%s/*.py" % cdir): +        # Data will be a little proxy that modules can use +        data = CloudPartData(self.datasource, self.paths) + +        # This keeps track of all the active handlers +        handlers = CloudHandlers(self) + +        # Add handlers in cdir +        for fname in glob.glob(os.path.join(cdir, "*.py")):              if not os.path.isfile(fname):                  continue              modname = os.path.basename(fname)[0:-3]              try: -                mod = __import__(modname) -                handler_register(mod, part_handlers, data, frequency) -                log.debug("added handler for [%s] from %s" % (mod.list_types(), -                                                              fname)) +                mod = parts.fixup_module(importer.import_module(modname)) +                types = handlers.register(mod) +                LOG.debug("Added handler for [%s] from %s", types, fname)              except: -                log.warn("failed to initialize handler in %s" % fname) -                util.logexc(log) - -        # add the internal handers if their type hasn't been already claimed -        for (btype, bhand, bfreq) in self.builtin_handlers: -            if btype in part_handlers: -                continue -            handler_register(InternalPartHandler(bhand, [btype], bfreq), -                part_handlers, data, frequency) +                LOG.exception("Failed to register handler in %s", fname) -        # walk the data -        pdata = {'handlers': part_handlers, 'handlerdir': idir, -                 'data': data, 'frequency': frequency} -        UserDataHandler.walk_userdata(self.get_userdata(), -            partwalker_callback, data=pdata) +        def_handlers = handlers.register_defaults() +        if def_handlers: +            LOG.debug("Registered default handlers for [%s]", def_handlers) -        # give callbacks opportunity to finalize +        # Init the handlers first +        # Ensure userdata fetched before activation          called = [] -        for (_mtype, mod) in part_handlers.iteritems(): +        for (_mtype, mod) in handlers.iteritems():              if mod in called:                  continue -            handler_call_end(mod, data, frequency) - -    def handle_user_script(self, _data, ctype, filename, payload, _frequency): -        if ctype == "__end__": -            return -        if ctype == "__begin__": -            # maybe delete existing things here -            return - -        filename = filename.replace(os.sep, '_') -        scriptsdir = get_ipath_cur('scripts') -        util.write_file("%s/%s" % -            (scriptsdir, filename), util.dos2unix(payload), 0700) - -    def handle_upstart_job(self, _data, ctype, filename, payload, frequency): -        # upstart jobs are only written on the first boot -        if frequency != per_instance: -            return - -        if ctype == "__end__" or ctype == "__begin__": -            return -        if not filename.endswith(".conf"): -            filename = filename + ".conf" - -        util.write_file("%s/%s" % ("/etc/init", filename), -            util.dos2unix(payload), 0644) - -    def handle_cloud_config(self, _data, ctype, filename, payload, _frequency): -        if ctype == "__begin__": -            self.cloud_config_str = "" -            return -        if ctype == "__end__": -            cloud_config = self.get_ipath("cloud_config") -            util.write_file(cloud_config, self.cloud_config_str, 0600) - -            ## this could merge the cloud config with the system config -            ## for now, not doing this as it seems somewhat circular -            ## as CloudConfig does that also, merging it with this cfg -            ## -            # ccfg = yaml.load(self.cloud_config_str) -            # if ccfg is None: ccfg = {} -            # self.cfg = util.mergedict(ccfg, self.cfg) - -            return - -        self.cloud_config_str += "\n#%s\n%s" % (filename, payload) - -    def handle_cloud_boothook(self, _data, ctype, filename, payload, -                              _frequency): -        if ctype == "__end__": -            return -        if ctype == "__begin__": -            return - -        filename = filename.replace(os.sep, '_') -        payload = util.dos2unix(payload) -        prefix = "#cloud-boothook" -        start = 0 -        if payload.startswith(prefix): -            start = len(prefix) + 1 - -        boothooks_dir = self.get_ipath("boothooks") -        filepath = "%s/%s" % (boothooks_dir, filename) -        util.write_file(filepath, payload[start:], 0700) -        try: -            env = os.environ.copy() -            env['INSTANCE_ID'] = self.datasource.get_instance_id() -            subprocess.check_call([filepath], env=env) -        except subprocess.CalledProcessError as e: -            log.error("boothooks script %s returned %i" % -                (filepath, e.returncode)) -        except Exception as e: -            log.error("boothooks unknown exception %s when running %s" % -                (e, filepath)) +            parts.call_begin(mod, data, frequency) +            called.append(mod) + +        # Walk the user data +        part_data = { +            'handlers': handlers, +            'handlerdir': idir, +            'data': data,  +            'frequency': frequency, +            'handlercount': 0, +        } +        user_data.walk(data.get_userdata(), parts.walker_callback, data=part_data) + +        # Give callbacks opportunity to finalize +        called = [] +        for (_mtype, mod) in handlers.iteritems(): +            if mod in called: +                continue +            parts.call_end(mod, data, frequency) +            called.append(mod) -    def get_public_ssh_keys(self): -        return(self.datasource.get_public_ssh_keys()) -    def get_locale(self): -        return(self.datasource.get_locale()) +class CloudHandlers(object): -    def get_mirror(self): -        return(self.datasource.get_local_mirror()) +    def __init__(self, paths): +        self.paths = paths +        self.registered = {} -    def get_hostname(self, fqdn=False): -        return(self.datasource.get_hostname(fqdn=fqdn)) +    def __contains__(self, item): +        return self.is_registered(item) -    def device_name_to_device(self, name): -        return(self.datasource.device_name_to_device(name)) +    def __getitem__(self, key): +        return self._get_handler(key) -    # I really don't know if this should be here or not, but -    # I needed it in cc_update_hostname, where that code had a valid 'cloud' -    # reference, but did not have a cloudinit handle -    # (ie, no cloudinit.get_cpath()) -    def get_cpath(self, name=None): -        return(get_cpath(name)) +    def is_registered(self, content_type): +        return content_type in self.registered + +    def register(self, mod): +        types = set() +        for t in mod.list_types(): +            self.registered[t] = handler +            types.add(t) +        return types + +    def _get_handler(self, content_type): +        return self.registered[content_type] + +    def items(self): +        return self.registered.items() + +    def iteritems(self): +        return self.registered.iteritems() +    def _get_default_handlers(self): +        def_handlers = [] +        if self.paths.get_ipath("cloud_config"): +            def_handlers.append(parts.CloudConfigPartHandler(self.paths.get_ipath("cloud_config"))) +        if self.paths.get_ipath_cur('scripts'): +            def_handlers.append(parts.ShellScriptPartHandler(self.paths.get_ipath_cur('scripts'))) +        if self.paths.get_ipath("boothooks"): +            def_handlers.append(parts.BootHookPartHandler(self.paths.get_ipath("boothooks"))) +        if self.paths.upstart_conf_d: +            def_handlers.append(parts.UpstartJobPartHandler(self.paths.upstart_conf_d)) +        return def_handlers + +    def register_defaults(self): +        registered = set() +        for h in self._get_default_handlers(): +            for t in h.list_types(): +                if not self.is_registered(t) +                    self.register_handler(t, h) +                    registered.add(t) +        return registered + + +class CloudConfig(object): +    cfgfile = None +    cfg = None + +    def __init__(self, cfgfile, cloud=None, ds_deps=None): +        if cloud == None: +            self.cloud = cloudinit.CloudInit(ds_deps) +            self.cloud.get_data_source() +        else: +            self.cloud = cloud +        self.cfg = self.get_config_obj(cfgfile) + +    def get_config_obj(self, cfgfile): +        try: +            cfg = util.read_conf(cfgfile) +        except: +            # TODO: this 'log' could/should be passed in +            cloudinit.log.critical("Failed loading of cloud config '%s'. " +                                   "Continuing with empty config\n" % cfgfile) +            cloudinit.log.debug(traceback.format_exc() + "\n") +            cfg = None +        if cfg is None: +            cfg = {} + +        try: +            ds_cfg = self.cloud.datasource.get_config_obj() +        except: +            ds_cfg = {} + +        cfg = util.mergedict(cfg, ds_cfg) +        return(util.mergedict(cfg, self.cloud.cfg)) + +    def handle(self, name, args, freq=None): +        try: +            mod = __import__("cc_" + name.replace("-", "_"), globals()) +            def_freq = getattr(mod, "frequency", per_instance) +            handler = getattr(mod, "handle") + +            if not freq: +                freq = def_freq + +            self.cloud.sem_and_run("config-" + name, freq, handler, +                [name, self.cfg, self.cloud, cloudinit.log, args]) +        except: +            raise | 
