summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cloudinit/cloud.py482
1 files changed, 11 insertions, 471 deletions
diff --git a/cloudinit/cloud.py b/cloudinit/cloud.py
index 80d4f1ce..765e7d3a 100644
--- a/cloudinit/cloud.py
+++ b/cloudinit/cloud.py
@@ -20,158 +20,23 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-from time import time
-
-import cPickle as pickle
-
-import contextlib
-import copy
-import os
-import sys
-import weakref
-
-from cloudinit.settings import (PER_INSTANCE, PER_ALWAYS)
-from cloudinit.settings import (OLD_CLOUD_CONFIG, CLOUD_CONFIG)
-
-from cloudinit import (get_builtin_cfg, get_base_cfg)
+from cloudinit import distros
+from cloudinit import helpers
from cloudinit import log as logging
-from cloudinit import sources
-from cloudinit import util
-from cloudinit import handlers
-
-from cloudinit import user_data as ud
-from cloudinit.user_data import boot_hook as bh_part
-from cloudinit.user_data import cloud_config as cc_part
-from cloudinit.user_data import processor as ud_proc
-from cloudinit.user_data import shell_script as ss_part
-from cloudinit.user_data import upstart_job as up_part
LOG = logging.getLogger(__name__)
-class CloudSemaphores(object):
- def __init__(self, sem_path):
- self.sem_path = sem_path
-
- # acquire lock on 'name' for given 'freq' and run function 'func'
- # if 'clear_on_fail' is True and 'func' throws an exception
- # then remove the lock (so it would run again)
- def run_functor(self, name, freq, functor, args=None, clear_on_fail=False):
- if not args:
- args = []
- if self.has_run(name, freq):
- LOG.debug("%s already ran %s", name, freq)
- return False
- with self.lock(name, freq, clear_on_fail) as lock:
- if not lock:
- raise RuntimeError("Failed to acquire lock on %s" % name)
- else:
- LOG.debug("Running %s with args %s using lock %s", func, args, lock)
- func(*args)
- return True
-
- @contextlib.contextmanager
- def lock(self, name, freq, clear_on_fail=False):
- try:
- yield self._acquire(name, freq)
- except:
- if clear_on_fail:
- self.clear(name, freq)
- raise
-
- def clear(self, name, freq):
- sem_file = self._getpath(name, freq)
- try:
- util.del_file(sem_file)
- except IOError:
- return False
- return True
-
- def _acquire(self, name, freq):
- if self.has_run(name, freq):
- return None
- # This is a race condition since nothing atomic is happening
- # here, but this should be ok due to the nature of when
- # and where cloud-init runs... (file writing is not a lock..)
- sem_file = self._getpath(name, freq)
- contents = "%s: %s\n" % (os.getpid(), time())
- try:
- util.write_file(sem_file, contents)
- except (IOError, OSError):
- return None
- return sem_file
-
- def has_run(self, name, freq):
- if freq == PER_ALWAYS:
- return False
- sem_file = self._get_path(name, freq)
- if os.path.exists(sem_file):
- return True
- return False
-
- def _get_path(self, name, freq):
- sem_path = self.sem_path
- if freq == PER_INSTANCE:
- return os.path.join(sem_path, name)
- return os.path.join(sem_path, "%s.%s" % (name, freq))
-
-
-class CloudPaths(object):
- def __init__(self, sys_info):
- self.cloud_dir = sys_info['cloud_dir']
- self.instance_link = os.path.join(self.cloud_dir, 'instance')
- self.boot_finished = os.path.join(self.instance_link, "boot-finished")
- self.upstart_conf_d = sys_info.get('upstart_dir')
- self.template_dir = sys_info['templates_dir']
- self.seed_dir = os.path.join(self.cloud_dir, 'seed')
- self.datasource = None
- self.lookups = {
- "handlers": "handlers",
- "scripts": "scripts",
- "sem": "sem",
- "boothooks": "boothooks",
- "userdata_raw": "user-data.txt",
- "userdata": "user-data.txt.i",
- "obj_pkl": "obj.pkl",
- "cloud_config": "cloud-config.txt",
- "data": "data",
- }
-
- # get_ipath_cur: get the current instance path for an item
- def get_ipath_cur(self, name=None):
- ipath = os.path.join(self.cloud_dir, 'instance')
- add_on = self.lookups.get(name)
- if add_on:
- ipath = os.path.join(ipath, add_on)
- return ipath
-
- # get_cpath : get the "clouddir" (/var/lib/cloud/<name>)
- # for a name in dirmap
- def get_cpath(self, name=None):
- cpath = self.var_dir
- add_on = self.lookups.get(name)
- if add_on:
- cpath = os.path.join(cpath, add_on)
- return cpath
-
- # get_ipath : get the instance path for a name in pathmap
- # (/var/lib/cloud/instances/<instance>/<name>)
- def get_ipath(self, name=None):
- if not self.datasource:
- raise RuntimeError("Unable to get instance path, datasource not available/set.")
- iid = self.datasource.get_instance_id()
- ipath = os.path.join(self.cloud_dir, 'instances', iid)
- add_on = self.lookups.get(name)
- if add_on:
- ipath = os.path.join(ipath, add_on)
- return ipath
-
+class Cloud(object):
+ def __init__(self, datasource, paths, cfg):
+ self.datasource = datasource
+ self.paths = paths
+ self.cfg = cfg
+ self.distro = distros.fetch(cfg, self)
+ self.runners = helpers.Runners(paths)
-class CloudSimple(object):
- def __init__(self, ci):
- self.datasource = init.datasource
- self.paths = init.paths
- self.cfg = copy.deepcopy(ci.cfg)
+ def run(self, name, functor, args, freq=None, clear_on_fail=False):
+ return self.runners.run(name, functor, args, freq, clear_on_fail)
def get_userdata(self):
return self.datasource.get_userdata()
@@ -199,328 +64,3 @@ class CloudSimple(object):
def get_ipath(self, name=None):
return self.paths.get_ipath(name)
-
-
-class CloudInit(object):
- def __init__(self, ds_deps=None):
- self.datasource = None
- if ds_deps:
- self.ds_deps = ds_deps
- else:
- self.ds_deps = [sources.DEP_FILESYSTEM, sources.DEP_NETWORK]
- self.cfg = self._read_cfg()
- self.paths = CloudPaths(self.cfg['system_info'])
-
- def _read_cfg_old(self):
- # support reading the old ConfigObj format file and merging
- # it into the yaml dictionary
- try:
- from configobj import ConfigObj
- except ImportError:
- ConfigObj = None
- if not ConfigObj:
- return {}
- old_cfg = ConfigObj(OLD_CLOUD_CONFIG)
- return dict(old_cfg)
-
- def _initial_subdirs(self):
- c_dir = self.paths.cloud_dir
- initial_dirs = [
- os.path.join(c_dir, 'scripts'),
- os.path.join(c_dir, 'scripts', 'per-instance'),
- os.path.join(c_dir, 'scripts', 'per-once'),
- os.path.join(c_dir, 'scripts', 'per-boot'),
- os.path.join(c_dir, 'seed'),
- os.path.join(c_dir, 'instances'),
- os.path.join(c_dir, 'handlers'),
- os.path.join(c_dir, 'sem'),
- os.path.join(c_dir, 'data'),
- ]
- return initial_dirs
-
- def purge_cache(self, rmcur=True):
- rmlist = []
- rmlist.append(self.paths.boot_finished)
- if rmcur:
- rmlist.append(self.paths.instance_link)
- for f in rmlist:
- util.unlink(f)
- return len(rmlist)
-
- def init_fs(self):
- util.ensure_dirs(self._initial_subdirs())
- log_file = util.get_cfg_option_str(self.cfg, 'def_log_file', None)
- perms = util.get_cfg_option_str(self.cfg, 'syslog_fix_perms', None)
- if log_file:
- util.ensure_file(log_file)
- if perms:
- (u, g) = perms.split(':', 1)
- if u == "-1" or u == "None":
- u = None
- if g == "-1" or g == "None":
- g = None
- util.chownbyname(log_file, u, g)
-
- def _read_cfg(self):
- starting_config = get_builtin_cfg()
- try:
- conf = get_base_cfg(CLOUD_CONFIG, starting_config)
- except Exception:
- conf = starting_config
- old_conf = self._read_cfg_old()
- conf = util.mergedict(conf, old_conf)
- return conf
-
- def _restore_from_cache(self):
- pickled_fn = self.paths.get_ipath_cur('obj_pkl')
- try:
- # we try to restore from a current link and static path
- # by using the instance link, if purge_cache was called
- # the file wont exist
- return pickle.loads(util.load_file(pickled_fn))
- except Exception as e:
- LOG.debug("Failed loading pickled datasource from %s due to %s", pickled_fn, e)
- return False
-
- def write_to_cache(self):
- pickled_fn = self.paths.get_ipath_cur("obj_pkl")
- try:
- contents = pickle.dumps(self.datasource)
- util.write_file(pickled_fn, contents, mode=0400)
- except Exception as e:
- LOG.debug("Failed pickling datasource to %s due to: %s", pickled_fn, e)
- return False
-
- def _get_processor(self):
- return ud_proc.UserDataProcessor(self.paths)
-
- def _get_datasources(self):
- # Any config provided???
- pkg_list = self.cfg.get('datasource_pkg_list') or []
- # Add the defaults at the end
- for n in [util.obj_name(sources), '']:
- if n not in pkg_list:
- pkg_list.append(n)
- cfg_list = self.cfg.get('datasource_list') or []
- return (cfg_list, pkg_list)
-
- def get_data_source(self):
- if self.datasource:
- return True
- ds = self._restore_from_cache()
- if ds:
- LOG.debug("Restored from cache datasource: %s" % ds)
- else:
- (cfg_list, pkg_list) = self._get_datasources()
- ud_proc = self._get_processor()
- (ds, dsname) = sources.find_source(self.cfg,
- self.ds_deps,
- cfg_list=cfg_list,
- pkg_list=pkg_list,
- ud_proc=ud_proc)
- LOG.debug("Loaded datasource %s - %s", dsname, ds)
- self.datasource = ds
- # This allows the paths obj to have an ipath function that works
- self.paths.datasource = ds
- return True
-
- def set_cur_instance(self):
- # Ensure we are hooked into the right symlink for the current instance
- idir = self.paths.get_ipath()
- util.del_file(self.paths.instance_link)
- util.sym_link(idir, self.paths.instance_link)
-
- dlist = []
- for d in ["handlers", "scripts", "sem"]:
- dlist.append(os.path.join(idir, d))
- util.ensure_dirs(dlist)
-
- # Write out information on what is being used for the current instance
- # and what may have been used for a previous instance...
- dp = self.paths.get_cpath('data')
- ds = "%s: %s\n" % (self.datasource.__class__, self.datasource)
- previous_ds = ''
- ds_fn = os.path.join(idir, 'datasource')
- try:
- previous_ds = util.load_file(ds_fn).strip()
- except IOError as e:
- pass
- if not previous_ds:
- # TODO: ?? is this right
- previous_ds = ds
- util.write_file(ds_fn, ds)
- util.write_file(os.path.join(dp, 'previous-datasource'), previous_ds)
- iid = self.datasource.get_instance_id()
- previous_iid = ''
- p_iid_fn = os.path.join(dp, 'previous-instance-id')
- try:
- previous_iid = util.load_file(p_iid_fn).strip()
- except IOError as e:
- pass
- if not previous_iid:
- # TODO: ?? is this right
- previous_iid = iid
- util.write_file(p_iid_fn, "%s\n" % previous_iid)
-
- def update_cache(self):
- self.write_to_cache()
- self.store_userdata()
-
- def store_userdata(self):
- raw_ud = "%s" % (self.datasource.get_userdata_raw())
- util.write_file(self.paths.get_ipath('userdata_raw'), raw_ud, 0600)
- ud = "%s" % (self.datasource.get_userdata())
- util.write_file(self.paths.get_ipath('userdata'), ud, 0600)
-
- def consume_userdata(self, frequency=PER_INSTANCE):
- cdir = self.paths.get_cpath("handlers")
- idir = self.paths.get_ipath("handlers")
-
- # Add the path to the plugins dir to the top of our list for import
- # instance dir should be read before cloud-dir
- sys.path.insert(0, cdir)
- sys.path.insert(0, idir)
-
- # Data will be a little proxy that modules can use
- data = CloudSimple(self)
-
- # This keeps track of all the active handlers
- handlers = CloudHandlers(self)
-
- # Add handlers in cdir
- potential_handlers = utils.find_modules(cdir)
- for (fname, modname) in potential_handlers.iteritems():
- try:
- mod = parts.fixup_module(importer.import_module(modname))
- types = handlers.register(mod)
- LOG.debug("Added handler for [%s] from %s", types, fname)
- except:
- LOG.exception("Failed to register handler from %s", fname)
-
- def_handlers = handlers.register_defaults()
- if def_handlers:
- LOG.debug("Registered default handlers for [%s]", def_handlers)
-
- # Init the handlers first
- # Ensure userdata fetched before activation
- called = []
- for (_mtype, mod) in handlers.iteritems():
- if mod in called:
- continue
- parts.call_begin(mod, data, frequency)
- called.append(mod)
-
- # Walk the user data
- part_data = {
- 'handlers': handlers,
- 'handlerdir': idir,
- 'data': data,
- 'frequency': frequency,
- 'handlercount': 0,
- }
- ud.walk(data.get_userdata(), parts.walker_callback, data=part_data)
-
- # Give callbacks opportunity to finalize
- called = []
- for (_mtype, mod) in handlers.iteritems():
- if mod in called:
- continue
- parts.call_end(mod, data, frequency)
- called.append(mod)
-
-
-class CloudHandlers(object):
-
- def __init__(self, paths):
- self.paths = paths
- self.registered = {}
-
- def __contains__(self, item):
- return self.is_registered(item)
-
- def __getitem__(self, key):
- return self._get_handler(key)
-
- def is_registered(self, content_type):
- return content_type in self.registered
-
- def register(self, mod):
- types = set()
- for t in mod.list_types():
- self.registered[t] = handler
- types.add(t)
- return types
-
- def _get_handler(self, content_type):
- return self.registered[content_type]
-
- def items(self):
- return self.registered.items()
-
- def iteritems(self):
- return self.registered.iteritems()
-
- def _get_default_handlers(self):
- def_handlers = []
- if self.paths.get_ipath("cloud_config"):
- def_handlers.append(cc_part.CloudConfigPartHandler(self.paths.get_ipath("cloud_config")))
- if self.paths.get_ipath_cur('scripts'):
- def_handlers.append(ss_part.ShellScriptPartHandler(self.paths.get_ipath_cur('scripts')))
- if self.paths.get_ipath("boothooks"):
- def_handlers.append(bh_part.BootHookPartHandler(self.paths.get_ipath("boothooks")))
- if self.paths.upstart_conf_d:
- def_handlers.append(up_part.UpstartJobPartHandler(self.paths.upstart_conf_d))
- return def_handlers
-
- def register_defaults(self):
- registered = set()
- for h in self._get_default_handlers():
- for t in h.list_types():
- if not self.is_registered(t)
- self.register_handler(t, h)
- registered.add(t)
- return registered
-
-
-class CloudConfig(object):
- def __init__(self, cfgfile, cloud):
- self.cloud = cloud
- self.cfg = self._get_config(cfgfile)
- self.paths = cloud.paths
- self.sems = CloudSemaphores(self.paths.get_ipath("sem"))
-
- def _get_config(self, cfgfile):
- cfg = None
- try:
- cfg = util.read_conf(cfgfile)
- except:
- LOG.exception(("Failed loading of cloud config '%s'. "
- "Continuing with empty config."), cfgfile)
- if not cfg:
- cfg = {}
-
- ds_cfg = None
- try:
- ds_cfg = self.cloud.datasource.get_config_obj()
- except:
- LOG.exception("Failed loading of datasource config.")
- if not ds_cfg:
- ds_cfg = {}
-
- cfg = util.mergedict(cfg, ds_cfg)
- cloud_cfg = self.cloud.cfg or {}
- return util.mergedict(cfg, cloud_cfg)
-
- def extract(self, name):
- modname = handlers.form_module_name(name)
- if not modname:
- return None
- return handlers.fixup_module(importer.import_module(modname))
-
- def handle(self, name, mod, args, freq=None):
- def_freq = mod.frequency
- if not freq:
- freq = def_freq
- c_name = "config-%s" % (name)
- real_args = [name, copy.deepcopy(self.cfg), CloudSimple(self.cloud), LOG, copy.deepcopy(args)]
- return self.sems.run_functor(c_name, freq, mod.handle, real_args)