diff options
Diffstat (limited to 'cloudinit/stages.py')
-rw-r--r-- | cloudinit/stages.py | 766 |
1 files changed, 507 insertions, 259 deletions
diff --git a/cloudinit/stages.py b/cloudinit/stages.py index 0cce6e80..3f17294b 100644 --- a/cloudinit/stages.py +++ b/cloudinit/stages.py @@ -8,34 +8,34 @@ import copy import os import pickle import sys +from collections import namedtuple +from typing import Dict, Set # noqa: F401 -from cloudinit.settings import ( - FREQUENCIES, CLOUD_CONFIG, PER_INSTANCE, RUN_CLOUD_CONFIG) - -from cloudinit import handlers +from cloudinit import cloud, config, distros, handlers, helpers, importer +from cloudinit import log as logging +from cloudinit import net, sources, type_utils, util +from cloudinit.event import EventScope, EventType, userdata_to_events # Default handlers (used if not overridden) from cloudinit.handlers.boot_hook import BootHookPartHandler from cloudinit.handlers.cloud_config import CloudConfigPartHandler from cloudinit.handlers.jinja_template import JinjaTemplatePartHandler from cloudinit.handlers.shell_script import ShellScriptPartHandler +from cloudinit.handlers.shell_script_by_frequency import ( + ShellScriptByFreqPartHandler, +) from cloudinit.handlers.upstart_job import UpstartJobPartHandler - -from cloudinit.event import EventType -from cloudinit.sources import NetworkConfigSource - -from cloudinit import cloud -from cloudinit import config -from cloudinit import distros -from cloudinit import helpers -from cloudinit import importer -from cloudinit import log as logging -from cloudinit import net from cloudinit.net import cmdline from cloudinit.reporting import events -from cloudinit import sources -from cloudinit import type_utils -from cloudinit import util +from cloudinit.settings import ( + CLOUD_CONFIG, + FREQUENCIES, + PER_ALWAYS, + PER_INSTANCE, + PER_ONCE, + RUN_CLOUD_CONFIG, +) +from cloudinit.sources import NetworkConfigSource LOG = logging.getLogger(__name__) @@ -43,6 +43,60 @@ NULL_DATA_SOURCE = None NO_PREVIOUS_INSTANCE_ID = "NO_PREVIOUS_INSTANCE_ID" +def update_event_enabled( + datasource: sources.DataSource, + cfg: dict, + event_source_type: EventType, + scope: EventScope = None, +) -> bool: + """Determine if a particular EventType is enabled. + + For the `event_source_type` passed in, check whether this EventType + is enabled in the `updates` section of the userdata. If `updates` + is not enabled in userdata, check if defined as one of the + `default_events` on the datasource. `scope` may be used to + narrow the check to a particular `EventScope`. + + Note that on first boot, userdata may NOT be available yet. In this + case, we only have the data source's `default_update_events`, + so an event that should be enabled in userdata may be denied. + """ + default_events = ( + datasource.default_update_events + ) # type: Dict[EventScope, Set[EventType]] + user_events = userdata_to_events( + cfg.get("updates", {}) + ) # type: Dict[EventScope, Set[EventType]] + # A value in the first will override a value in the second + allowed = util.mergemanydict( + [ + copy.deepcopy(user_events), + copy.deepcopy(default_events), + ] + ) + LOG.debug("Allowed events: %s", allowed) + + if not scope: + scopes = allowed.keys() + else: + scopes = [scope] + scope_values = [s.value for s in scopes] + + for evt_scope in scopes: + if event_source_type in allowed.get(evt_scope, []): + LOG.debug( + "Event Allowed: scope=%s EventType=%s", + evt_scope.value, + event_source_type, + ) + return True + + LOG.debug( + "Event Denied: scopes=%s EventType=%s", scope_values, event_source_type + ) + return False + + class Init(object): def __init__(self, ds_deps=None, reporter=None): if ds_deps is not None: @@ -60,8 +114,10 @@ class Init(object): if reporter is None: reporter = events.ReportEventStack( - name="init-reporter", description="init-desc", - reporting_enabled=False) + name="init-reporter", + description="init-desc", + reporting_enabled=False, + ) self.reporter = reporter def _reset(self, reset_ds=False): @@ -77,8 +133,8 @@ class Init(object): def distro(self): if not self._distro: # Try to find the right class to use - system_config = self._extract_cfg('system') - distro_name = system_config.pop('distro', 'ubuntu') + system_config = self._extract_cfg("system") + distro_name = system_config.pop("distro", "ubuntu") distro_cls = distros.fetch(distro_name) LOG.debug("Using distro class %s", distro_cls) self._distro = distro_cls(distro_name, system_config, self.paths) @@ -92,19 +148,19 @@ class Init(object): @property def cfg(self): - return self._extract_cfg('restricted') + return self._extract_cfg("restricted") def _extract_cfg(self, restriction): # Ensure actually read self.read_cfg() # Nobody gets the real config ocfg = copy.deepcopy(self._cfg) - if restriction == 'restricted': - ocfg.pop('system_info', None) - elif restriction == 'system': - ocfg = util.get_cfg_by_path(ocfg, ('system_info',), {}) - elif restriction == 'paths': - ocfg = util.get_cfg_by_path(ocfg, ('system_info', 'paths'), {}) + if restriction == "restricted": + ocfg.pop("system_info", None) + elif restriction == "system": + ocfg = util.get_cfg_by_path(ocfg, ("system_info",), {}) + elif restriction == "paths": + ocfg = util.get_cfg_by_path(ocfg, ("system_info", "paths"), {}) if not isinstance(ocfg, (dict)): ocfg = {} return ocfg @@ -112,24 +168,26 @@ class Init(object): @property def paths(self): if not self._paths: - path_info = self._extract_cfg('paths') + path_info = self._extract_cfg("paths") self._paths = helpers.Paths(path_info, self.datasource) return self._paths def _initial_subdirs(self): c_dir = self.paths.cloud_dir + run_dir = self.paths.run_dir initial_dirs = [ c_dir, - os.path.join(c_dir, 'scripts'), - os.path.join(c_dir, 'scripts', 'per-instance'), - os.path.join(c_dir, 'scripts', 'per-once'), - os.path.join(c_dir, 'scripts', 'per-boot'), - os.path.join(c_dir, 'scripts', 'vendor'), - os.path.join(c_dir, 'seed'), - os.path.join(c_dir, 'instances'), - os.path.join(c_dir, 'handlers'), - os.path.join(c_dir, 'sem'), - os.path.join(c_dir, 'data'), + os.path.join(c_dir, "scripts"), + os.path.join(c_dir, "scripts", "per-instance"), + os.path.join(c_dir, "scripts", "per-once"), + os.path.join(c_dir, "scripts", "per-boot"), + os.path.join(c_dir, "scripts", "vendor"), + os.path.join(c_dir, "seed"), + os.path.join(c_dir, "instances"), + os.path.join(c_dir, "handlers"), + os.path.join(c_dir, "sem"), + os.path.join(c_dir, "data"), + os.path.join(run_dir, "sem"), ] return initial_dirs @@ -146,10 +204,10 @@ class Init(object): def _initialize_filesystem(self): util.ensure_dirs(self._initial_subdirs()) - log_file = util.get_cfg_option_str(self.cfg, 'def_log_file') + log_file = util.get_cfg_option_str(self.cfg, "def_log_file") if log_file: - util.ensure_file(log_file, preserve_mode=True) - perms = self.cfg.get('syslog_fix_perms') + util.ensure_file(log_file, mode=0o640, preserve_mode=True) + perms = self.cfg.get("syslog_fix_perms") if not perms: perms = {} if not isinstance(perms, list): @@ -164,8 +222,12 @@ class Init(object): except OSError as e: error = e - LOG.warning("Failed changing perms on '%s'. tried: %s. %s", - log_file, ','.join(perms), error) + LOG.warning( + "Failed changing perms on '%s'. tried: %s. %s", + log_file, + ",".join(perms), + error, + ) def read_cfg(self, extra_fns=None): # None check so that we don't keep on re-loading if empty @@ -175,37 +237,41 @@ class Init(object): def _read_cfg(self, extra_fns): no_cfg_paths = helpers.Paths({}, self.datasource) - merger = helpers.ConfigMerger(paths=no_cfg_paths, - datasource=self.datasource, - additional_fns=extra_fns, - base_cfg=fetch_base_config()) + merger = helpers.ConfigMerger( + paths=no_cfg_paths, + datasource=self.datasource, + additional_fns=extra_fns, + base_cfg=fetch_base_config(), + ) return merger.cfg def _restore_from_cache(self): # We try to restore from a current link and static path # by using the instance link, if purge_cache was called # the file wont exist. - return _pkl_load(self.paths.get_ipath_cur('obj_pkl')) + return _pkl_load(self.paths.get_ipath_cur("obj_pkl")) def _write_to_cache(self): if self.datasource is NULL_DATA_SOURCE: return False - if util.get_cfg_option_bool(self.cfg, 'manual_cache_clean', False): + if util.get_cfg_option_bool(self.cfg, "manual_cache_clean", False): # The empty file in instance/ dir indicates manual cleaning, # and can be read by ds-identify. util.write_file( self.paths.get_ipath_cur("manual_clean_marker"), - omode="w", content="") + omode="w", + content="", + ) return _pkl_store(self.datasource, self.paths.get_ipath_cur("obj_pkl")) def _get_datasources(self): # Any config provided??? - pkg_list = self.cfg.get('datasource_pkg_list') or [] + pkg_list = self.cfg.get("datasource_pkg_list") or [] # Add the defaults at the end - for n in ['', type_utils.obj_name(sources)]: + for n in ["", type_utils.obj_name(sources)]: if n not in pkg_list: pkg_list.append(n) - cfg_list = self.cfg.get('datasource_list') or [] + cfg_list = self.cfg.get("datasource_list") or [] return (cfg_list, pkg_list) def _restore_from_checked_cache(self, existing): @@ -216,7 +282,7 @@ class Init(object): if not ds: return (None, "no cache found") - run_iid_fn = self.paths.get_runpath('instance_id') + run_iid_fn = self.paths.get_runpath("instance_id") if os.path.exists(run_iid_fn): run_iid = util.load_file(run_iid_fn).strip() else: @@ -227,20 +293,22 @@ class Init(object): elif existing == "trust": return (ds, "restored from cache: %s" % ds) else: - if (hasattr(ds, 'check_instance_id') and - ds.check_instance_id(self.cfg)): + if hasattr(ds, "check_instance_id") and ds.check_instance_id( + self.cfg + ): return (ds, "restored from checked cache: %s" % ds) else: return (None, "cache invalid in datasource: %s" % ds) - def _get_data_source(self, existing): + def _get_data_source(self, existing) -> sources.DataSource: if self.datasource is not NULL_DATA_SOURCE: return self.datasource with events.ReportEventStack( - name="check-cache", - description="attempting to read from cache [%s]" % existing, - parent=self.reporter) as myrep: + name="check-cache", + description="attempting to read from cache [%s]" % existing, + parent=self.reporter, + ) as myrep: ds, desc = self._restore_from_checked_cache(existing) myrep.description = desc @@ -252,21 +320,24 @@ class Init(object): (cfg_list, pkg_list) = self._get_datasources() # Deep copy so that user-data handlers can not modify # (which will affect user-data handlers down the line...) - (ds, dsname) = sources.find_source(self.cfg, - self.distro, - self.paths, - copy.deepcopy(self.ds_deps), - cfg_list, - pkg_list, self.reporter) + (ds, dsname) = sources.find_source( + self.cfg, + self.distro, + self.paths, + copy.deepcopy(self.ds_deps), + cfg_list, + pkg_list, + self.reporter, + ) LOG.info("Loaded datasource %s - %s", dsname, ds) - self.datasource = ds + self.datasource = ds # type: sources.DataSource # Ensure we adjust our path members datasource # now that we have one (thus allowing ipath to be used) self._reset() return ds def _get_instance_subdirs(self): - return ['handlers', 'scripts', 'sem'] + return ["handlers", "scripts", "sem"] def _get_ipath(self, subname=None): # Force a check to see if anything @@ -274,8 +345,10 @@ class Init(object): # then a datasource has not been assigned... instance_dir = self.paths.get_ipath(subname) if not instance_dir: - raise RuntimeError(("No instance directory is available." - " Has a datasource been fetched??")) + raise RuntimeError( + "No instance directory is available." + " Has a datasource been fetched??" + ) return instance_dir def _reflect_cur_instance(self): @@ -293,12 +366,12 @@ class Init(object): # Write out information on what is being used for the current instance # and what may have been used for a previous instance... - dp = self.paths.get_cpath('data') + dp = self.paths.get_cpath("data") # Write what the datasource was and is.. ds = "%s: %s" % (type_utils.obj_name(self.datasource), self.datasource) previous_ds = None - ds_fn = os.path.join(idir, 'datasource') + ds_fn = os.path.join(idir, "datasource") try: previous_ds = util.load_file(ds_fn).strip() except Exception: @@ -306,18 +379,20 @@ class Init(object): if not previous_ds: previous_ds = ds util.write_file(ds_fn, "%s\n" % ds) - util.write_file(os.path.join(dp, 'previous-datasource'), - "%s\n" % (previous_ds)) + util.write_file( + os.path.join(dp, "previous-datasource"), "%s\n" % (previous_ds) + ) # What the instance id was and is... iid = self.datasource.get_instance_id() - iid_fn = os.path.join(dp, 'instance-id') + iid_fn = os.path.join(dp, "instance-id") previous_iid = self.previous_iid() util.write_file(iid_fn, "%s\n" % iid) - util.write_file(self.paths.get_runpath('instance_id'), "%s\n" % iid) - util.write_file(os.path.join(dp, 'previous-instance-id'), - "%s\n" % (previous_iid)) + util.write_file(self.paths.get_runpath("instance_id"), "%s\n" % iid) + util.write_file( + os.path.join(dp, "previous-instance-id"), "%s\n" % (previous_iid) + ) self._write_to_cache() # Ensure needed components are regenerated @@ -330,8 +405,8 @@ class Init(object): if self._previous_iid is not None: return self._previous_iid - dp = self.paths.get_cpath('data') - iid_fn = os.path.join(dp, 'instance-id') + dp = self.paths.get_cpath("data") + iid_fn = os.path.join(dp, "instance-id") try: self._previous_iid = util.load_file(iid_fn).strip() except Exception: @@ -341,9 +416,16 @@ class Init(object): return self._previous_iid def is_new_instance(self): + """Return true if this is a new instance. + + If datasource has already been initialized, this will return False, + even on first boot. + """ previous = self.previous_iid() - ret = (previous == NO_PREVIOUS_INSTANCE_ID or - previous != self.datasource.get_instance_id()) + ret = ( + previous == NO_PREVIOUS_INSTANCE_ID + or previous != self.datasource.get_instance_id() + ) return ret def fetch(self, existing="check"): @@ -354,75 +436,102 @@ class Init(object): def cloudify(self): # Form the needed options to cloudify our members - return cloud.Cloud(self.datasource, - self.paths, self.cfg, - self.distro, helpers.Runners(self.paths), - reporter=self.reporter) + return cloud.Cloud( + self.datasource, + self.paths, + self.cfg, + self.distro, + helpers.Runners(self.paths), + reporter=self.reporter, + ) def update(self): - self._store_userdata() - self._store_vendordata() + self._store_rawdata(self.datasource.get_userdata_raw(), "userdata") + self._store_processeddata(self.datasource.get_userdata(), "userdata") + self._store_raw_vendordata( + self.datasource.get_vendordata_raw(), "vendordata" + ) + self._store_processeddata( + self.datasource.get_vendordata(), "vendordata" + ) + self._store_raw_vendordata( + self.datasource.get_vendordata2_raw(), "vendordata2" + ) + self._store_processeddata( + self.datasource.get_vendordata2(), "vendordata2" + ) def setup_datasource(self): - with events.ReportEventStack("setup-datasource", - "setting up datasource", - parent=self.reporter): + with events.ReportEventStack( + "setup-datasource", "setting up datasource", parent=self.reporter + ): if self.datasource is None: raise RuntimeError("Datasource is None, cannot setup.") self.datasource.setup(is_new_instance=self.is_new_instance()) def activate_datasource(self): - with events.ReportEventStack("activate-datasource", - "activating datasource", - parent=self.reporter): + with events.ReportEventStack( + "activate-datasource", + "activating datasource", + parent=self.reporter, + ): if self.datasource is None: raise RuntimeError("Datasource is None, cannot activate.") - self.datasource.activate(cfg=self.cfg, - is_new_instance=self.is_new_instance()) + self.datasource.activate( + cfg=self.cfg, is_new_instance=self.is_new_instance() + ) self._write_to_cache() - def _store_userdata(self): - raw_ud = self.datasource.get_userdata_raw() - if raw_ud is None: - raw_ud = b'' - util.write_file(self._get_ipath('userdata_raw'), raw_ud, 0o600) - # processed userdata is a Mime message, so write it as string. - processed_ud = self.datasource.get_userdata() - if processed_ud is None: - raw_ud = '' - util.write_file(self._get_ipath('userdata'), str(processed_ud), 0o600) - - def _store_vendordata(self): - raw_vd = self.datasource.get_vendordata_raw() - if raw_vd is None: - raw_vd = b'' - util.write_file(self._get_ipath('vendordata_raw'), raw_vd, 0o600) - # processed vendor data is a Mime message, so write it as string. - processed_vd = str(self.datasource.get_vendordata()) - if processed_vd is None: - processed_vd = '' - util.write_file(self._get_ipath('vendordata'), str(processed_vd), - 0o600) + def _store_rawdata(self, data, datasource): + # Raw data is bytes, not a string + if data is None: + data = b"" + util.write_file(self._get_ipath("%s_raw" % datasource), data, 0o600) + + def _store_raw_vendordata(self, data, datasource): + # Only these data types + if data is not None and type(data) not in [bytes, str, list]: + raise TypeError( + "vendordata_raw is unsupported type '%s'" % str(type(data)) + ) + # This data may be a list, convert it to a string if so + if isinstance(data, list): + data = util.json_dumps(data) + self._store_rawdata(data, datasource) + + def _store_processeddata(self, processed_data, datasource): + # processed is a Mime message, so write as string. + if processed_data is None: + processed_data = "" + util.write_file( + self._get_ipath(datasource), str(processed_data), 0o600 + ) def _default_handlers(self, opts=None): if opts is None: opts = {} - opts.update({ - 'paths': self.paths, - 'datasource': self.datasource, - }) + opts.update( + { + "paths": self.paths, + "datasource": self.datasource, + } + ) # TODO(harlowja) Hmmm, should we dynamically import these?? cloudconfig_handler = CloudConfigPartHandler(**opts) shellscript_handler = ShellScriptPartHandler(**opts) def_handlers = [ cloudconfig_handler, shellscript_handler, + ShellScriptByFreqPartHandler(PER_ALWAYS, **opts), + ShellScriptByFreqPartHandler(PER_INSTANCE, **opts), + ShellScriptByFreqPartHandler(PER_ONCE, **opts), BootHookPartHandler(**opts), UpstartJobPartHandler(**opts), ] opts.update( - {'sub_handlers': [cloudconfig_handler, shellscript_handler]}) + {"sub_handlers": [cloudconfig_handler, shellscript_handler]} + ) def_handlers.append(JinjaTemplatePartHandler(**opts)) return def_handlers @@ -431,11 +540,23 @@ class Init(object): def _default_vendordata_handlers(self): return self._default_handlers( - opts={'script_path': 'vendor_scripts', - 'cloud_config_path': 'vendor_cloud_config'}) + opts={ + "script_path": "vendor_scripts", + "cloud_config_path": "vendor_cloud_config", + } + ) + + def _default_vendordata2_handlers(self): + return self._default_handlers( + opts={ + "script_path": "vendor_scripts", + "cloud_config_path": "vendor2_cloud_config", + } + ) - def _do_handlers(self, data_msg, c_handlers_list, frequency, - excluded=None): + def _do_handlers( + self, data_msg, c_handlers_list, frequency, excluded=None + ): """ Generalized handlers suitable for use with either vendordata or userdata @@ -462,21 +583,31 @@ class Init(object): for (fname, mod_name) in potential_handlers.items(): try: mod_locs, looked_locs = importer.find_module( - mod_name, [''], ['list_types', 'handle_part']) + mod_name, [""], ["list_types", "handle_part"] + ) if not mod_locs: - LOG.warning("Could not find a valid user-data handler" - " named %s in file %s (searched %s)", - mod_name, fname, looked_locs) + LOG.warning( + "Could not find a valid user-data handler" + " named %s in file %s (searched %s)", + mod_name, + fname, + looked_locs, + ) continue mod = importer.import_module(mod_locs[0]) mod = handlers.fixup_handler(mod) types = c_handlers.register(mod) if types: - LOG.debug("Added custom handler for %s [%s] from %s", - types, mod, fname) + LOG.debug( + "Added custom handler for %s [%s] from %s", + types, + mod, + fname, + ) except Exception: - util.logexc(LOG, "Failed to register handler from %s", - fname) + util.logexc( + LOG, "Failed to register handler from %s", fname + ) # This keeps track of all the active handlers c_handlers = helpers.ContentHandlers() @@ -508,17 +639,17 @@ class Init(object): def walk_handlers(excluded): # Walk the user data part_data = { - 'handlers': c_handlers, + "handlers": c_handlers, # Any new handlers that are encountered get writen here - 'handlerdir': idir, - 'data': data, + "handlerdir": idir, + "data": data, # The default frequency if handlers don't have one - 'frequency': frequency, + "frequency": frequency, # This will be used when new handlers are found # to help write their contents to files with numbered # names... - 'handlercount': 0, - 'excluded': excluded, + "handlercount": 0, + "excluded": excluded, } handlers.walk(data_msg, handlers.walker_callback, data=part_data) @@ -544,18 +675,29 @@ class Init(object): def consume_data(self, frequency=PER_INSTANCE): # Consume the userdata first, because we need want to let the part # handlers run first (for merging stuff) - with events.ReportEventStack("consume-user-data", - "reading and applying user-data", - parent=self.reporter): - if util.get_cfg_option_bool(self.cfg, 'allow_userdata', True): + with events.ReportEventStack( + "consume-user-data", + "reading and applying user-data", + parent=self.reporter, + ): + if util.get_cfg_option_bool(self.cfg, "allow_userdata", True): self._consume_userdata(frequency) else: - LOG.debug('allow_userdata = False: discarding user-data') + LOG.debug("allow_userdata = False: discarding user-data") + + with events.ReportEventStack( + "consume-vendor-data", + "reading and applying vendor-data", + parent=self.reporter, + ): + self._consume_vendordata("vendordata", frequency) - with events.ReportEventStack("consume-vendor-data", - "reading and applying vendor-data", - parent=self.reporter): - self._consume_vendordata(frequency) + with events.ReportEventStack( + "consume-vendor-data2", + "reading and applying vendor-data2", + parent=self.reporter, + ): + self._consume_vendordata("vendordata2", frequency) # Perform post-consumption adjustments so that # modules that run during the init stage reflect @@ -568,50 +710,75 @@ class Init(object): # objects before the load of the userdata happened, # this is expected. - def _consume_vendordata(self, frequency=PER_INSTANCE): + def _consume_vendordata(self, vendor_source, frequency=PER_INSTANCE): """ Consume the vendordata and run the part handlers on it """ + # User-data should have been consumed first. # So we merge the other available cloud-configs (everything except # vendor provided), and check whether or not we should consume # vendor data at all. That gives user or system a chance to override. - if not self.datasource.get_vendordata_raw(): - LOG.debug("no vendordata from datasource") - return - - _cc_merger = helpers.ConfigMerger(paths=self._paths, - datasource=self.datasource, - additional_fns=[], - base_cfg=self.cfg, - include_vendor=False) - vdcfg = _cc_merger.cfg.get('vendor_data', {}) + if vendor_source == "vendordata": + if not self.datasource.get_vendordata_raw(): + LOG.debug("no vendordata from datasource") + return + cfg_name = "vendor_data" + elif vendor_source == "vendordata2": + if not self.datasource.get_vendordata2_raw(): + LOG.debug("no vendordata2 from datasource") + return + cfg_name = "vendor_data2" + else: + raise RuntimeError( + "vendor_source arg must be either 'vendordata'" + " or 'vendordata2'" + ) + + _cc_merger = helpers.ConfigMerger( + paths=self._paths, + datasource=self.datasource, + additional_fns=[], + base_cfg=self.cfg, + include_vendor=False, + ) + vdcfg = _cc_merger.cfg.get(cfg_name, {}) if not isinstance(vdcfg, dict): - vdcfg = {'enabled': False} - LOG.warning("invalid 'vendor_data' setting. resetting to: %s", - vdcfg) + vdcfg = {"enabled": False} + LOG.warning( + "invalid %s setting. resetting to: %s", cfg_name, vdcfg + ) - enabled = vdcfg.get('enabled') - no_handlers = vdcfg.get('disabled_handlers', None) + enabled = vdcfg.get("enabled") + no_handlers = vdcfg.get("disabled_handlers", None) if not util.is_true(enabled): - LOG.debug("vendordata consumption is disabled.") + LOG.debug("%s consumption is disabled.", vendor_source) return - LOG.debug("vendor data will be consumed. disabled_handlers=%s", - no_handlers) + LOG.debug( + "%s will be consumed. disabled_handlers=%s", + vendor_source, + no_handlers, + ) - # Ensure vendordata source fetched before activation (just incase) - vendor_data_msg = self.datasource.get_vendordata() + # Ensure vendordata source fetched before activation (just in case.) - # This keeps track of all the active handlers, while excluding what the - # users doesn't want run, i.e. boot_hook, cloud_config, shell_script - c_handlers_list = self._default_vendordata_handlers() + # c_handlers_list keeps track of all the active handlers, while + # excluding what the users doesn't want run, i.e. boot_hook, + # cloud_config, shell_script + if vendor_source == "vendordata": + vendor_data_msg = self.datasource.get_vendordata() + c_handlers_list = self._default_vendordata_handlers() + else: + vendor_data_msg = self.datasource.get_vendordata2() + c_handlers_list = self._default_vendordata2_handlers() # Run the handlers - self._do_handlers(vendor_data_msg, c_handlers_list, frequency, - excluded=no_handlers) + self._do_handlers( + vendor_data_msg, c_handlers_list, frequency, excluded=no_handlers + ) def _consume_userdata(self, frequency=PER_INSTANCE): """ @@ -629,7 +796,8 @@ class Init(object): def _find_networking_config(self): disable_file = os.path.join( - self.paths.get_cpath('data'), 'upgraded-network') + self.paths.get_cpath("data"), "upgraded-network" + ) if os.path.exists(disable_file): return (None, disable_file) @@ -637,12 +805,13 @@ class Init(object): NetworkConfigSource.cmdline: cmdline.read_kernel_cmdline_config(), NetworkConfigSource.initramfs: cmdline.read_initramfs_config(), NetworkConfigSource.ds: None, - NetworkConfigSource.system_cfg: self.cfg.get('network'), + NetworkConfigSource.system_cfg: self.cfg.get("network"), } - if self.datasource and hasattr(self.datasource, 'network_config'): - available_cfgs[NetworkConfigSource.ds] = ( - self.datasource.network_config) + if self.datasource and hasattr(self.datasource, "network_config"): + available_cfgs[ + NetworkConfigSource.ds + ] = self.datasource.network_config if self.datasource: order = self.datasource.network_config_sources @@ -650,12 +819,17 @@ class Init(object): order = sources.DataSource.network_config_sources for cfg_source in order: if not hasattr(NetworkConfigSource, cfg_source): - LOG.warning('data source specifies an invalid network' - ' cfg_source: %s', cfg_source) + LOG.warning( + "data source specifies an invalid network cfg_source: %s", + cfg_source, + ) continue if cfg_source not in available_cfgs: - LOG.warning('data source specifies an unavailable network' - ' cfg_source: %s', cfg_source) + LOG.warning( + "data source specifies an unavailable network" + " cfg_source: %s", + cfg_source, + ) continue ncfg = available_cfgs[cfg_source] if net.is_disabled_cfg(ncfg): @@ -663,8 +837,10 @@ class Init(object): return (None, cfg_source) if ncfg: return (ncfg, cfg_source) - return (self.distro.generate_fallback_config(), - NetworkConfigSource.fallback) + return ( + self.distro.generate_fallback_config(), + NetworkConfigSource.fallback, + ) def _apply_netcfg_names(self, netcfg): try: @@ -673,27 +849,60 @@ class Init(object): except Exception as e: LOG.warning("Failed to rename devices: %s", e) + def _get_per_boot_network_semaphore(self): + return namedtuple("Semaphore", "semaphore args")( + helpers.FileSemaphores(self.paths.get_runpath("sem")), + ("apply_network_config", PER_ONCE), + ) + + def _network_already_configured(self) -> bool: + sem = self._get_per_boot_network_semaphore() + return sem.semaphore.has_run(*sem.args) + def apply_network_config(self, bring_up): - # get a network config + """Apply the network config. + + Find the config, determine whether to apply it, apply it via + the distro, and optionally bring it up + """ netcfg, src = self._find_networking_config() if netcfg is None: LOG.info("network config is disabled by %s", src) return - # request an update if needed/available - if self.datasource is not NULL_DATA_SOURCE: - if not self.is_new_instance(): - if not self.datasource.update_metadata([EventType.BOOT]): - LOG.debug( - "No network config applied. Neither a new instance" - " nor datasource network update on '%s' event", - EventType.BOOT) - # nothing new, but ensure proper names - self._apply_netcfg_names(netcfg) - return - else: - # refresh netcfg after update - netcfg, src = self._find_networking_config() + def event_enabled_and_metadata_updated(event_type): + return ( + update_event_enabled( + datasource=self.datasource, + cfg=self.cfg, + event_source_type=event_type, + scope=EventScope.NETWORK, + ) + and self.datasource.update_metadata_if_supported([event_type]) + ) + + def should_run_on_boot_event(): + return ( + not self._network_already_configured() + and event_enabled_and_metadata_updated(EventType.BOOT) + ) + + if ( + self.datasource is not NULL_DATA_SOURCE + and not self.is_new_instance() + and not should_run_on_boot_event() + and not event_enabled_and_metadata_updated(EventType.BOOT_LEGACY) + ): + LOG.debug( + "No network config applied. Neither a new instance" + " nor datasource network update allowed" + ) + # nothing new, but ensure proper names + self._apply_netcfg_names(netcfg) + return + + # refresh netcfg after update + netcfg, src = self._find_networking_config() # ensure all physical devices in config are present self.distro.networking.wait_for_physdevs(netcfg) @@ -702,18 +911,32 @@ class Init(object): self._apply_netcfg_names(netcfg) # rendering config - LOG.info("Applying network configuration from %s bringup=%s: %s", - src, bring_up, netcfg) + LOG.info( + "Applying network configuration from %s bringup=%s: %s", + src, + bring_up, + netcfg, + ) + + sem = self._get_per_boot_network_semaphore() try: - return self.distro.apply_network_config(netcfg, bring_up=bring_up) + with sem.semaphore.lock(*sem.args): + return self.distro.apply_network_config( + netcfg, bring_up=bring_up + ) except net.RendererNotFoundError as e: - LOG.error("Unable to render networking. Network config is " - "likely broken: %s", e) + LOG.error( + "Unable to render networking. Network config is " + "likely broken: %s", + e, + ) return except NotImplementedError: - LOG.warning("distro '%s' does not implement apply_network_config. " - "networking may not be configured properly.", - self.distro) + LOG.warning( + "distro '%s' does not implement apply_network_config. " + "networking may not be configured properly.", + self.distro, + ) return @@ -725,18 +948,22 @@ class Modules(object): self._cached_cfg = None if reporter is None: reporter = events.ReportEventStack( - name="module-reporter", description="module-desc", - reporting_enabled=False) + name="module-reporter", + description="module-desc", + reporting_enabled=False, + ) self.reporter = reporter @property def cfg(self): # None check to avoid empty case causing re-reading if self._cached_cfg is None: - merger = helpers.ConfigMerger(paths=self.init.paths, - datasource=self.init.datasource, - additional_fns=self.cfg_files, - base_cfg=self.init.cfg) + merger = helpers.ConfigMerger( + paths=self.init.paths, + datasource=self.init.datasource, + additional_fns=self.cfg_files, + base_cfg=self.init.cfg, + ) self._cached_cfg = merger.cfg # LOG.debug("Loading 'module' config %s", self._cached_cfg) # Only give out a copy so that others can't modify this... @@ -757,57 +984,67 @@ class Modules(object): if not item: continue if isinstance(item, str): - module_list.append({ - 'mod': item.strip(), - }) + module_list.append( + { + "mod": item.strip(), + } + ) elif isinstance(item, (list)): contents = {} # Meant to fall through... if len(item) >= 1: - contents['mod'] = item[0].strip() + contents["mod"] = item[0].strip() if len(item) >= 2: - contents['freq'] = item[1].strip() + contents["freq"] = item[1].strip() if len(item) >= 3: - contents['args'] = item[2:] + contents["args"] = item[2:] if contents: module_list.append(contents) elif isinstance(item, (dict)): contents = {} valid = False - if 'name' in item: - contents['mod'] = item['name'].strip() + if "name" in item: + contents["mod"] = item["name"].strip() valid = True - if 'frequency' in item: - contents['freq'] = item['frequency'].strip() - if 'args' in item: - contents['args'] = item['args'] or [] + if "frequency" in item: + contents["freq"] = item["frequency"].strip() + if "args" in item: + contents["args"] = item["args"] or [] if contents and valid: module_list.append(contents) else: - raise TypeError(("Failed to read '%s' item in config," - " unknown type %s") % - (item, type_utils.obj_name(item))) + raise TypeError( + "Failed to read '%s' item in config, unknown type %s" + % (item, type_utils.obj_name(item)) + ) return module_list def _fixup_modules(self, raw_mods): mostly_mods = [] for raw_mod in raw_mods: - raw_name = raw_mod['mod'] - freq = raw_mod.get('freq') - run_args = raw_mod.get('args') or [] + raw_name = raw_mod["mod"] + freq = raw_mod.get("freq") + run_args = raw_mod.get("args") or [] mod_name = config.form_module_name(raw_name) if not mod_name: continue if freq and freq not in FREQUENCIES: - LOG.warning(("Config specified module %s" - " has an unknown frequency %s"), raw_name, freq) + LOG.warning( + "Config specified module %s has an unknown frequency %s", + raw_name, + freq, + ) # Reset it so when ran it will get set to a known value freq = None mod_locs, looked_locs = importer.find_module( - mod_name, ['', type_utils.obj_name(config)], ['handle']) + mod_name, ["", type_utils.obj_name(config)], ["handle"] + ) if not mod_locs: - LOG.warning("Could not find module named %s (searched %s)", - mod_name, looked_locs) + LOG.warning( + "Could not find module named %s (searched %s)", + mod_name, + looked_locs, + ) continue mod = config.fixup_module(importer.import_module(mod_locs[0])) mostly_mods.append([mod, raw_name, freq, run_args]) @@ -826,15 +1063,15 @@ class Modules(object): freq = mod.frequency if freq not in FREQUENCIES: freq = PER_INSTANCE - LOG.debug("Running module %s (%s) with frequency %s", - name, mod, freq) + LOG.debug( + "Running module %s (%s) with frequency %s", name, mod, freq + ) # Use the configs logger and not our own # TODO(harlowja): possibly check the module # for having a LOG attr and just give it back # its own logger? - func_args = [name, self.cfg, - cc, config.LOG, args] + func_args = [name, self.cfg, cc, config.LOG, args] # Mark it as having started running which_ran.append(name) # This name will affect the semaphore name created @@ -842,11 +1079,13 @@ class Modules(object): desc = "running %s with frequency %s" % (run_name, freq) myrep = events.ReportEventStack( - name=run_name, description=desc, parent=self.reporter) + name=run_name, description=desc, parent=self.reporter + ) with myrep: - ran, _r = cc.run(run_name, mod.handle, func_args, - freq=freq) + ran, _r = cc.run( + run_name, mod.handle, func_args, freq=freq + ) if ran: myrep.message = "%s ran successfully" % run_name else: @@ -860,9 +1099,9 @@ class Modules(object): def run_single(self, mod_name, args=None, freq=None): # Form the users module 'specs' mod_to_be = { - 'mod': mod_name, - 'args': args, - 'freq': freq, + "mod": mod_name, + "args": args, + "freq": freq, } # Now resume doing the normal fixups and running raw_mods = [mod_to_be] @@ -876,13 +1115,14 @@ class Modules(object): skipped = [] forced = [] - overridden = self.cfg.get('unverified_modules', []) + overridden = self.cfg.get("unverified_modules", []) active_mods = [] all_distros = set([distros.ALL_DISTROS]) for (mod, name, _freq, _args) in mostly_mods: worked_distros = set(mod.distros) # Minimally [] per fixup_modules worked_distros.update( - distros.Distro.expand_osfamily(mod.osfamilies)) + distros.Distro.expand_osfamily(mod.osfamilies) + ) # Skip only when the following conditions are all met: # - distros are defined in the module != ALL_DISTROS @@ -898,12 +1138,15 @@ class Modules(object): active_mods.append([mod, name, _freq, _args]) if skipped: - LOG.info("Skipping modules '%s' because they are not verified " - "on distro '%s'. To run anyway, add them to " - "'unverified_modules' in config.", - ','.join(skipped), d_name) + LOG.info( + "Skipping modules '%s' because they are not verified " + "on distro '%s'. To run anyway, add them to " + "'unverified_modules' in config.", + ",".join(skipped), + d_name, + ) if forced: - LOG.info("running unverified_modules: '%s'", ', '.join(forced)) + LOG.info("running unverified_modules: '%s'", ", ".join(forced)) return self._run_modules(active_mods) @@ -923,7 +1166,9 @@ def fetch_base_config(): read_runtime_config(), # Kernel/cmdline parameters override system config util.read_conf_from_cmdline(), - ], reverse=True) + ], + reverse=True, + ) def _pkl_store(obj, fname): @@ -953,8 +1198,11 @@ def _pkl_load(fname): return None try: return pickle.loads(pickle_contents) + except sources.DatasourceUnpickleUserDataError: + return None except Exception: util.logexc(LOG, "Failed loading pickled blob from %s", fname) return None + # vi: ts=4 expandtab |