summaryrefslogtreecommitdiff
path: root/cloudinit/stages.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/stages.py')
-rw-r--r--cloudinit/stages.py551
1 files changed, 551 insertions, 0 deletions
diff --git a/cloudinit/stages.py b/cloudinit/stages.py
new file mode 100644
index 00000000..8fd6aa5d
--- /dev/null
+++ b/cloudinit/stages.py
@@ -0,0 +1,551 @@
+# vi: ts=4 expandtab
+#
+# Copyright (C) 2012 Canonical Ltd.
+# Copyright (C) 2012 Hewlett-Packard Development Company, L.P.
+# Copyright (C) 2012 Yahoo! Inc.
+#
+# Author: Scott Moser <scott.moser@canonical.com>
+# Author: Juerg Haefliger <juerg.haefliger@hp.com>
+# Author: Joshua Harlow <harlowja@yahoo-inc.com>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License version 3, as
+# published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import cPickle as pickle
+
+import copy
+import os
+import sys
+
+from cloudinit.settings import (PER_INSTANCE, FREQUENCIES, CLOUD_CONFIG)
+
+from cloudinit import handlers
+
+# Default handlers (used if not overridden)
+from cloudinit.handlers import boot_hook as bh_part
+from cloudinit.handlers import cloud_config as cc_part
+from cloudinit.handlers import shell_script as ss_part
+from cloudinit.handlers import upstart_job as up_part
+
+from cloudinit import cloud
+from cloudinit import config
+from cloudinit import distros
+from cloudinit import helpers
+from cloudinit import importer
+from cloudinit import log as logging
+from cloudinit import sources
+from cloudinit import util
+
+LOG = logging.getLogger(__name__)
+
+
+class Init(object):
+ def __init__(self, ds_deps=None):
+ if ds_deps is not None:
+ self.ds_deps = ds_deps
+ else:
+ self.ds_deps = [sources.DEP_FILESYSTEM, sources.DEP_NETWORK]
+ # Created on first use
+ self._cfg = None
+ self._paths = None
+ self._distro = None
+ # Created only when a fetch occurs
+ self.datasource = None
+
+ @property
+ def distro(self):
+ if not self._distro:
+ # Try to find the right class to use
+ scfg = self._extract_cfg('system')
+ name = scfg.pop('distro', 'ubuntu')
+ cls = distros.fetch(name)
+ LOG.debug("Using distro class %s", cls)
+ self._distro = cls(name, scfg, self.paths)
+ return self._distro
+
+ @property
+ def cfg(self):
+ return self._extract_cfg('restricted')
+
+ def _extract_cfg(self, restriction):
+ # Ensure actually read
+ self.read_cfg()
+ # Nobody gets the real config
+ ocfg = copy.deepcopy(self._cfg)
+ if restriction == 'restricted':
+ ocfg.pop('system_info', None)
+ elif restriction == 'system':
+ ocfg = util.get_cfg_by_path(ocfg, ('system_info',), {})
+ elif restriction == 'paths':
+ ocfg = util.get_cfg_by_path(ocfg, ('system_info', 'paths'), {})
+ if not isinstance(ocfg, (dict)):
+ ocfg = {}
+ return ocfg
+
+ @property
+ def paths(self):
+ if not self._paths:
+ path_info = self._extract_cfg('paths')
+ self._paths = helpers.Paths(path_info, self.datasource)
+ return self._paths
+
+ def _initial_subdirs(self):
+ c_dir = self.paths.cloud_dir
+ initial_dirs = [
+ c_dir,
+ os.path.join(c_dir, 'scripts'),
+ os.path.join(c_dir, 'scripts', 'per-instance'),
+ os.path.join(c_dir, 'scripts', 'per-once'),
+ os.path.join(c_dir, 'scripts', 'per-boot'),
+ os.path.join(c_dir, 'seed'),
+ os.path.join(c_dir, 'instances'),
+ os.path.join(c_dir, 'handlers'),
+ os.path.join(c_dir, 'sem'),
+ os.path.join(c_dir, 'data'),
+ ]
+ return initial_dirs
+
+ def purge_cache(self, rm_instance_lnk=True):
+ rm_list = []
+ rm_list.append(self.paths.boot_finished)
+ if rm_instance_lnk:
+ rm_list.append(self.paths.instance_link)
+ for f in rm_list:
+ util.del_file(f)
+ return len(rm_list)
+
+ def initialize(self):
+ self._initialize_filesystem()
+
+ def _initialize_filesystem(self):
+ util.ensure_dirs(self._initial_subdirs())
+ log_file = util.get_cfg_option_str(self.cfg, 'def_log_file')
+ perms = util.get_cfg_option_str(self.cfg, 'syslog_fix_perms')
+ if log_file:
+ util.ensure_file(log_file)
+ if perms:
+ (u, g) = perms.split(':', 1)
+ if u == "-1" or u == "None":
+ u = None
+ if g == "-1" or g == "None":
+ g = None
+ util.chownbyname(log_file, u, g)
+
+ def read_cfg(self, extra_fns=None):
+ # None check so that we don't keep on re-loading if empty
+ if self._cfg is None:
+ self._cfg = self._read_cfg(extra_fns)
+ # LOG.debug("Loaded 'init' config %s", self._cfg)
+
+ def _read_base_cfg(self):
+ base_cfgs = []
+ default_cfg = util.get_builtin_cfg()
+ kern_contents = util.read_cc_from_cmdline()
+ # Kernel/cmdline parameters override system config
+ if kern_contents:
+ base_cfgs.append(util.load_yaml(kern_contents, default={}))
+ # Anything in your conf.d location??
+ # or the 'default' cloud.cfg location???
+ base_cfgs.append(util.read_conf_with_confd(CLOUD_CONFIG))
+ # And finally the default gets to play
+ if default_cfg:
+ base_cfgs.append(default_cfg)
+ return util.mergemanydict(base_cfgs)
+
+ def _read_cfg(self, extra_fns):
+ no_cfg_paths = helpers.Paths({}, self.datasource)
+ merger = helpers.ConfigMerger(paths=no_cfg_paths,
+ datasource=self.datasource,
+ additional_fns=extra_fns,
+ base_cfg=self._read_base_cfg())
+ return merger.cfg
+
+ def _restore_from_cache(self):
+ # We try to restore from a current link and static path
+ # by using the instance link, if purge_cache was called
+ # the file wont exist.
+ pickled_fn = self.paths.get_ipath_cur('obj_pkl')
+ pickle_contents = None
+ try:
+ pickle_contents = util.load_file(pickled_fn)
+ except Exception:
+ pass
+ # This is expected so just return nothing
+ # successfully loaded...
+ if not pickle_contents:
+ return None
+ try:
+ return pickle.loads(pickle_contents)
+ except Exception:
+ util.logexc(LOG, "Failed loading pickled blob from %s", pickled_fn)
+ return None
+
+ def _write_to_cache(self):
+ if not self.datasource:
+ return False
+ pickled_fn = self.paths.get_ipath_cur("obj_pkl")
+ try:
+ pk_contents = pickle.dumps(self.datasource)
+ except Exception:
+ util.logexc(LOG, "Failed pickling datasource %s", self.datasource)
+ return False
+ try:
+ util.write_file(pickled_fn, pk_contents, mode=0400)
+ except Exception:
+ util.logexc(LOG, "Failed pickling datasource to %s", pickled_fn)
+ return False
+ return True
+
+ def _get_datasources(self):
+ # Any config provided???
+ pkg_list = self.cfg.get('datasource_pkg_list') or []
+ # Add the defaults at the end
+ for n in ['', util.obj_name(sources)]:
+ if n not in pkg_list:
+ pkg_list.append(n)
+ cfg_list = self.cfg.get('datasource_list') or []
+ return (cfg_list, pkg_list)
+
+ def _get_data_source(self):
+ if self.datasource:
+ return self.datasource
+ ds = self._restore_from_cache()
+ if ds:
+ LOG.debug("Restored from cache, datasource: %s", ds)
+ if not ds:
+ (cfg_list, pkg_list) = self._get_datasources()
+ # Deep copy so that user-data handlers can not modify
+ # (which will affect user-data handlers down the line...)
+ (ds, dsname) = sources.find_source(self.cfg,
+ self.distro,
+ self.paths,
+ copy.deepcopy(self.ds_deps),
+ cfg_list,
+ pkg_list)
+ LOG.debug("Loaded datasource %s - %s", dsname, ds)
+ self.datasource = ds
+ # Ensure we adjust our path members datasource
+ # now that we have one (thus allowing ipath to be used)
+ self.paths.datasource = ds
+ return ds
+
+ def _get_instance_subdirs(self):
+ return ['handlers', 'scripts', 'sems']
+
+ def _get_ipath(self, subname=None):
+ # Force a check to see if anything
+ # actually comes back, if not
+ # then a datasource has not been assigned...
+ instance_dir = self.paths.get_ipath(subname)
+ if not instance_dir:
+ raise RuntimeError(("No instance directory is available."
+ " Has a datasource been fetched??"))
+ return instance_dir
+
+ def _reflect_cur_instance(self):
+ # Remove the old symlink and attach a new one so
+ # that further reads/writes connect into the right location
+ idir = self._get_ipath()
+ util.del_file(self.paths.instance_link)
+ util.sym_link(idir, self.paths.instance_link)
+
+ # Ensures these dirs exist
+ dir_list = []
+ for d in self._get_instance_subdirs():
+ dir_list.append(os.path.join(idir, d))
+ util.ensure_dirs(dir_list)
+
+ # Write out information on what is being used for the current instance
+ # and what may have been used for a previous instance...
+ dp = self.paths.get_cpath('data')
+
+ # Write what the datasource was and is..
+ ds = "%s: %s" % (util.obj_name(self.datasource), self.datasource)
+ previous_ds = None
+ ds_fn = os.path.join(idir, 'datasource')
+ try:
+ previous_ds = util.load_file(ds_fn).strip()
+ except Exception:
+ pass
+ if not previous_ds:
+ previous_ds = ds
+ util.write_file(ds_fn, "%s\n" % ds)
+ util.write_file(os.path.join(dp, 'previous-datasource'),
+ "%s\n" % (previous_ds))
+
+ # What the instance id was and is...
+ iid = self.datasource.get_instance_id()
+ previous_iid = None
+ iid_fn = os.path.join(dp, 'instance-id')
+ try:
+ previous_iid = util.load_file(iid_fn).strip()
+ except Exception:
+ pass
+ if not previous_iid:
+ previous_iid = iid
+ util.write_file(iid_fn, "%s\n" % iid)
+ util.write_file(os.path.join(dp, 'previous-instance-id'),
+ "%s\n" % (previous_iid))
+ return iid
+
+ def fetch(self):
+ return self._get_data_source()
+
+ def instancify(self):
+ return self._reflect_cur_instance()
+
+ def cloudify(self):
+ # Form the needed options to cloudify our members
+ return cloud.Cloud(self.datasource,
+ self.paths, self.cfg,
+ self.distro, helpers.Runners(self.paths))
+
+ def update(self):
+ if not self._write_to_cache():
+ return
+ self._store_userdata()
+
+ def _store_userdata(self):
+ raw_ud = "%s" % (self.datasource.get_userdata_raw())
+ util.write_file(self._get_ipath('userdata_raw'), raw_ud, 0600)
+ processed_ud = "%s" % (self.datasource.get_userdata())
+ util.write_file(self._get_ipath('userdata'), processed_ud, 0600)
+
+ def _default_userdata_handlers(self):
+ opts = {
+ 'paths': self.paths,
+ 'datasource': self.datasource,
+ }
+ # TODO Hmmm, should we dynamically import these??
+ def_handlers = [
+ cc_part.CloudConfigPartHandler(**opts),
+ ss_part.ShellScriptPartHandler(**opts),
+ bh_part.BootHookPartHandler(**opts),
+ up_part.UpstartJobPartHandler(**opts),
+ ]
+ return def_handlers
+
+ def consume_userdata(self, frequency=PER_INSTANCE):
+ cdir = self.paths.get_cpath("handlers")
+ idir = self._get_ipath("handlers")
+
+ # Add the path to the plugins dir to the top of our list for import
+ # instance dir should be read before cloud-dir
+ if cdir and cdir not in sys.path:
+ sys.path.insert(0, cdir)
+ if idir and idir not in sys.path:
+ sys.path.insert(0, idir)
+
+ # Ensure datasource fetched before activation (just incase)
+ user_data_msg = self.datasource.get_userdata()
+
+ # This keeps track of all the active handlers
+ c_handlers = helpers.ContentHandlers()
+
+ # Add handlers in cdir
+ potential_handlers = util.find_modules(cdir)
+ for (fname, mod_name) in potential_handlers.iteritems():
+ try:
+ mod_locs = importer.find_module(mod_name, [''],
+ ['list_types',
+ 'handle_part'])
+ if not mod_locs:
+ LOG.warn(("Could not find a valid user-data handler"
+ " named %s in file %s"), mod_name, fname)
+ continue
+ mod = importer.import_module(mod_locs[0])
+ mod = handlers.fixup_handler(mod)
+ types = c_handlers.register(mod)
+ LOG.debug("Added handler for %s from %s", types, fname)
+ except:
+ util.logexc(LOG, "Failed to register handler from %s", fname)
+
+ def_handlers = self._default_userdata_handlers()
+ applied_def_handlers = c_handlers.register_defaults(def_handlers)
+ if applied_def_handlers:
+ LOG.debug("Registered default handlers: %s", applied_def_handlers)
+
+ # Form our cloud interface
+ data = self.cloudify()
+
+ # Init the handlers first
+ called = []
+ for (_ctype, mod) in c_handlers.iteritems():
+ if mod in called:
+ continue
+ handlers.call_begin(mod, data, frequency)
+ called.append(mod)
+
+ # Walk the user data
+ part_data = {
+ 'handlers': c_handlers,
+ # Any new handlers that are encountered get writen here
+ 'handlerdir': idir,
+ 'data': data,
+ # The default frequency if handlers don't have one
+ 'frequency': frequency,
+ # This will be used when new handlers are found
+ # to help write there contents to files with numbered
+ # names...
+ 'handlercount': 0,
+ }
+ handlers.walk(user_data_msg, handlers.walker_callback, data=part_data)
+
+ # Give callbacks opportunity to finalize
+ called = []
+ for (_ctype, mod) in c_handlers.iteritems():
+ if mod in called:
+ continue
+ handlers.call_end(mod, data, frequency)
+ called.append(mod)
+
+
+class Modules(object):
+ def __init__(self, init, cfg_files=None):
+ self.init = init
+ self.cfg_files = cfg_files
+ # Created on first use
+ self._cached_cfg = None
+
+ @property
+ def cfg(self):
+ # None check to avoid empty case causing re-reading
+ if self._cached_cfg is None:
+ merger = helpers.ConfigMerger(paths=self.init.paths,
+ datasource=self.init.datasource,
+ additional_fns=self.cfg_files,
+ base_cfg=self.init.cfg)
+ self._cached_cfg = merger.cfg
+ # LOG.debug("Loading 'module' config %s", self._cached_cfg)
+ # Only give out a copy so that others can't modify this...
+ return copy.deepcopy(self._cached_cfg)
+
+ def _read_modules(self, name):
+ module_list = []
+ if name not in self.cfg:
+ return module_list
+ cfg_mods = self.cfg[name]
+ # Create 'module_list', an array of hashes
+ # Where hash['mod'] = module name
+ # hash['freq'] = frequency
+ # hash['args'] = arguments
+ for item in cfg_mods:
+ if not item:
+ continue
+ if isinstance(item, (str, basestring)):
+ module_list.append({
+ 'mod': item.strip(),
+ })
+ elif isinstance(item, (list)):
+ contents = {}
+ # Meant to fall through...
+ if len(item) >= 1:
+ contents['mod'] = item[0].strip()
+ if len(item) >= 2:
+ contents['freq'] = item[1].strip()
+ if len(item) >= 3:
+ contents['args'] = item[2:]
+ if contents:
+ module_list.append(contents)
+ elif isinstance(item, (dict)):
+ contents = {}
+ valid = False
+ if 'name' in item:
+ contents['mod'] = item['name'].strip()
+ valid = True
+ if 'frequency' in item:
+ contents['freq'] = item['frequency'].strip()
+ if 'args' in item:
+ contents['args'] = item['args'] or []
+ if contents and valid:
+ module_list.append(contents)
+ else:
+ raise TypeError(("Failed to read '%s' item in config,"
+ " unknown type %s") %
+ (item, util.obj_name(item)))
+ return module_list
+
+ def _fixup_modules(self, raw_mods):
+ mostly_mods = []
+ for raw_mod in raw_mods:
+ raw_name = raw_mod['mod']
+ freq = raw_mod.get('freq')
+ run_args = raw_mod.get('args') or []
+ mod_name = config.form_module_name(raw_name)
+ if not mod_name:
+ continue
+ if freq and freq not in FREQUENCIES:
+ LOG.warn(("Config specified module %s"
+ " has an unknown frequency %s"), raw_name, freq)
+ # Reset it so when ran it will get set to a known value
+ freq = None
+ mod_locs = importer.find_module(mod_name,
+ ['', util.obj_name(config)],
+ ['handle'])
+ if not mod_locs:
+ LOG.warn("Could not find module named %s", mod_name)
+ continue
+ mod = config.fixup_module(importer.import_module(mod_locs[0]))
+ mostly_mods.append([mod, raw_name, freq, run_args])
+ return mostly_mods
+
+ def _run_modules(self, mostly_mods):
+ d_name = self.init.distro.name
+ cc = self.init.cloudify()
+ # Return which ones ran
+ # and which ones failed + the exception of why it failed
+ failures = []
+ which_ran = []
+ for (mod, name, freq, args) in mostly_mods:
+ try:
+ # Try the modules frequency, otherwise fallback to a known one
+ if not freq:
+ freq = mod.frequency
+ if not freq in FREQUENCIES:
+ freq = PER_INSTANCE
+ worked_distros = mod.distros
+ if (worked_distros and d_name not in worked_distros):
+ LOG.warn(("Module %s is verified on %s distros"
+ " but not on %s distro. It may or may not work"
+ " correctly."), name, worked_distros, d_name)
+ # Use the configs logger and not our own
+ # TODO: possibly check the module
+ # for having a LOG attr and just give it back
+ # its own logger?
+ func_args = [name, self.cfg,
+ cc, config.LOG, args]
+ # Mark it as having started running
+ which_ran.append(name)
+ # This name will affect the semaphore name created
+ run_name = "config-%s" % (name)
+ cc.run(run_name, mod.handle, func_args, freq=freq)
+ except Exception as e:
+ util.logexc(LOG, "Running %s (%s) failed", name, mod)
+ failures.append((name, e))
+ return (which_ran, failures)
+
+ def run_single(self, mod_name, args=None, freq=None):
+ # Form the users module 'specs'
+ mod_to_be = {
+ 'mod': mod_name,
+ 'args': args,
+ 'freq': freq,
+ }
+ # Now resume doing the normal fixups and running
+ raw_mods = [mod_to_be]
+ mostly_mods = self._fixup_modules(raw_mods)
+ return self._run_modules(mostly_mods)
+
+ def run_section(self, section_name):
+ raw_mods = self._read_modules(section_name)
+ mostly_mods = self._fixup_modules(raw_mods)
+ return self._run_modules(mostly_mods)