summaryrefslogtreecommitdiff
path: root/cloudinit/stages.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/stages.py')
-rw-r--r--cloudinit/stages.py766
1 files changed, 507 insertions, 259 deletions
diff --git a/cloudinit/stages.py b/cloudinit/stages.py
index 0cce6e80..3f17294b 100644
--- a/cloudinit/stages.py
+++ b/cloudinit/stages.py
@@ -8,34 +8,34 @@ import copy
import os
import pickle
import sys
+from collections import namedtuple
+from typing import Dict, Set # noqa: F401
-from cloudinit.settings import (
- FREQUENCIES, CLOUD_CONFIG, PER_INSTANCE, RUN_CLOUD_CONFIG)
-
-from cloudinit import handlers
+from cloudinit import cloud, config, distros, handlers, helpers, importer
+from cloudinit import log as logging
+from cloudinit import net, sources, type_utils, util
+from cloudinit.event import EventScope, EventType, userdata_to_events
# Default handlers (used if not overridden)
from cloudinit.handlers.boot_hook import BootHookPartHandler
from cloudinit.handlers.cloud_config import CloudConfigPartHandler
from cloudinit.handlers.jinja_template import JinjaTemplatePartHandler
from cloudinit.handlers.shell_script import ShellScriptPartHandler
+from cloudinit.handlers.shell_script_by_frequency import (
+ ShellScriptByFreqPartHandler,
+)
from cloudinit.handlers.upstart_job import UpstartJobPartHandler
-
-from cloudinit.event import EventType
-from cloudinit.sources import NetworkConfigSource
-
-from cloudinit import cloud
-from cloudinit import config
-from cloudinit import distros
-from cloudinit import helpers
-from cloudinit import importer
-from cloudinit import log as logging
-from cloudinit import net
from cloudinit.net import cmdline
from cloudinit.reporting import events
-from cloudinit import sources
-from cloudinit import type_utils
-from cloudinit import util
+from cloudinit.settings import (
+ CLOUD_CONFIG,
+ FREQUENCIES,
+ PER_ALWAYS,
+ PER_INSTANCE,
+ PER_ONCE,
+ RUN_CLOUD_CONFIG,
+)
+from cloudinit.sources import NetworkConfigSource
LOG = logging.getLogger(__name__)
@@ -43,6 +43,60 @@ NULL_DATA_SOURCE = None
NO_PREVIOUS_INSTANCE_ID = "NO_PREVIOUS_INSTANCE_ID"
+def update_event_enabled(
+ datasource: sources.DataSource,
+ cfg: dict,
+ event_source_type: EventType,
+ scope: EventScope = None,
+) -> bool:
+ """Determine if a particular EventType is enabled.
+
+ For the `event_source_type` passed in, check whether this EventType
+ is enabled in the `updates` section of the userdata. If `updates`
+ is not enabled in userdata, check if defined as one of the
+ `default_events` on the datasource. `scope` may be used to
+ narrow the check to a particular `EventScope`.
+
+ Note that on first boot, userdata may NOT be available yet. In this
+ case, we only have the data source's `default_update_events`,
+ so an event that should be enabled in userdata may be denied.
+ """
+ default_events = (
+ datasource.default_update_events
+ ) # type: Dict[EventScope, Set[EventType]]
+ user_events = userdata_to_events(
+ cfg.get("updates", {})
+ ) # type: Dict[EventScope, Set[EventType]]
+ # A value in the first will override a value in the second
+ allowed = util.mergemanydict(
+ [
+ copy.deepcopy(user_events),
+ copy.deepcopy(default_events),
+ ]
+ )
+ LOG.debug("Allowed events: %s", allowed)
+
+ if not scope:
+ scopes = allowed.keys()
+ else:
+ scopes = [scope]
+ scope_values = [s.value for s in scopes]
+
+ for evt_scope in scopes:
+ if event_source_type in allowed.get(evt_scope, []):
+ LOG.debug(
+ "Event Allowed: scope=%s EventType=%s",
+ evt_scope.value,
+ event_source_type,
+ )
+ return True
+
+ LOG.debug(
+ "Event Denied: scopes=%s EventType=%s", scope_values, event_source_type
+ )
+ return False
+
+
class Init(object):
def __init__(self, ds_deps=None, reporter=None):
if ds_deps is not None:
@@ -60,8 +114,10 @@ class Init(object):
if reporter is None:
reporter = events.ReportEventStack(
- name="init-reporter", description="init-desc",
- reporting_enabled=False)
+ name="init-reporter",
+ description="init-desc",
+ reporting_enabled=False,
+ )
self.reporter = reporter
def _reset(self, reset_ds=False):
@@ -77,8 +133,8 @@ class Init(object):
def distro(self):
if not self._distro:
# Try to find the right class to use
- system_config = self._extract_cfg('system')
- distro_name = system_config.pop('distro', 'ubuntu')
+ system_config = self._extract_cfg("system")
+ distro_name = system_config.pop("distro", "ubuntu")
distro_cls = distros.fetch(distro_name)
LOG.debug("Using distro class %s", distro_cls)
self._distro = distro_cls(distro_name, system_config, self.paths)
@@ -92,19 +148,19 @@ class Init(object):
@property
def cfg(self):
- return self._extract_cfg('restricted')
+ return self._extract_cfg("restricted")
def _extract_cfg(self, restriction):
# Ensure actually read
self.read_cfg()
# Nobody gets the real config
ocfg = copy.deepcopy(self._cfg)
- if restriction == 'restricted':
- ocfg.pop('system_info', None)
- elif restriction == 'system':
- ocfg = util.get_cfg_by_path(ocfg, ('system_info',), {})
- elif restriction == 'paths':
- ocfg = util.get_cfg_by_path(ocfg, ('system_info', 'paths'), {})
+ if restriction == "restricted":
+ ocfg.pop("system_info", None)
+ elif restriction == "system":
+ ocfg = util.get_cfg_by_path(ocfg, ("system_info",), {})
+ elif restriction == "paths":
+ ocfg = util.get_cfg_by_path(ocfg, ("system_info", "paths"), {})
if not isinstance(ocfg, (dict)):
ocfg = {}
return ocfg
@@ -112,24 +168,26 @@ class Init(object):
@property
def paths(self):
if not self._paths:
- path_info = self._extract_cfg('paths')
+ path_info = self._extract_cfg("paths")
self._paths = helpers.Paths(path_info, self.datasource)
return self._paths
def _initial_subdirs(self):
c_dir = self.paths.cloud_dir
+ run_dir = self.paths.run_dir
initial_dirs = [
c_dir,
- os.path.join(c_dir, 'scripts'),
- os.path.join(c_dir, 'scripts', 'per-instance'),
- os.path.join(c_dir, 'scripts', 'per-once'),
- os.path.join(c_dir, 'scripts', 'per-boot'),
- os.path.join(c_dir, 'scripts', 'vendor'),
- os.path.join(c_dir, 'seed'),
- os.path.join(c_dir, 'instances'),
- os.path.join(c_dir, 'handlers'),
- os.path.join(c_dir, 'sem'),
- os.path.join(c_dir, 'data'),
+ os.path.join(c_dir, "scripts"),
+ os.path.join(c_dir, "scripts", "per-instance"),
+ os.path.join(c_dir, "scripts", "per-once"),
+ os.path.join(c_dir, "scripts", "per-boot"),
+ os.path.join(c_dir, "scripts", "vendor"),
+ os.path.join(c_dir, "seed"),
+ os.path.join(c_dir, "instances"),
+ os.path.join(c_dir, "handlers"),
+ os.path.join(c_dir, "sem"),
+ os.path.join(c_dir, "data"),
+ os.path.join(run_dir, "sem"),
]
return initial_dirs
@@ -146,10 +204,10 @@ class Init(object):
def _initialize_filesystem(self):
util.ensure_dirs(self._initial_subdirs())
- log_file = util.get_cfg_option_str(self.cfg, 'def_log_file')
+ log_file = util.get_cfg_option_str(self.cfg, "def_log_file")
if log_file:
- util.ensure_file(log_file, preserve_mode=True)
- perms = self.cfg.get('syslog_fix_perms')
+ util.ensure_file(log_file, mode=0o640, preserve_mode=True)
+ perms = self.cfg.get("syslog_fix_perms")
if not perms:
perms = {}
if not isinstance(perms, list):
@@ -164,8 +222,12 @@ class Init(object):
except OSError as e:
error = e
- LOG.warning("Failed changing perms on '%s'. tried: %s. %s",
- log_file, ','.join(perms), error)
+ LOG.warning(
+ "Failed changing perms on '%s'. tried: %s. %s",
+ log_file,
+ ",".join(perms),
+ error,
+ )
def read_cfg(self, extra_fns=None):
# None check so that we don't keep on re-loading if empty
@@ -175,37 +237,41 @@ class Init(object):
def _read_cfg(self, extra_fns):
no_cfg_paths = helpers.Paths({}, self.datasource)
- merger = helpers.ConfigMerger(paths=no_cfg_paths,
- datasource=self.datasource,
- additional_fns=extra_fns,
- base_cfg=fetch_base_config())
+ merger = helpers.ConfigMerger(
+ paths=no_cfg_paths,
+ datasource=self.datasource,
+ additional_fns=extra_fns,
+ base_cfg=fetch_base_config(),
+ )
return merger.cfg
def _restore_from_cache(self):
# We try to restore from a current link and static path
# by using the instance link, if purge_cache was called
# the file wont exist.
- return _pkl_load(self.paths.get_ipath_cur('obj_pkl'))
+ return _pkl_load(self.paths.get_ipath_cur("obj_pkl"))
def _write_to_cache(self):
if self.datasource is NULL_DATA_SOURCE:
return False
- if util.get_cfg_option_bool(self.cfg, 'manual_cache_clean', False):
+ if util.get_cfg_option_bool(self.cfg, "manual_cache_clean", False):
# The empty file in instance/ dir indicates manual cleaning,
# and can be read by ds-identify.
util.write_file(
self.paths.get_ipath_cur("manual_clean_marker"),
- omode="w", content="")
+ omode="w",
+ content="",
+ )
return _pkl_store(self.datasource, self.paths.get_ipath_cur("obj_pkl"))
def _get_datasources(self):
# Any config provided???
- pkg_list = self.cfg.get('datasource_pkg_list') or []
+ pkg_list = self.cfg.get("datasource_pkg_list") or []
# Add the defaults at the end
- for n in ['', type_utils.obj_name(sources)]:
+ for n in ["", type_utils.obj_name(sources)]:
if n not in pkg_list:
pkg_list.append(n)
- cfg_list = self.cfg.get('datasource_list') or []
+ cfg_list = self.cfg.get("datasource_list") or []
return (cfg_list, pkg_list)
def _restore_from_checked_cache(self, existing):
@@ -216,7 +282,7 @@ class Init(object):
if not ds:
return (None, "no cache found")
- run_iid_fn = self.paths.get_runpath('instance_id')
+ run_iid_fn = self.paths.get_runpath("instance_id")
if os.path.exists(run_iid_fn):
run_iid = util.load_file(run_iid_fn).strip()
else:
@@ -227,20 +293,22 @@ class Init(object):
elif existing == "trust":
return (ds, "restored from cache: %s" % ds)
else:
- if (hasattr(ds, 'check_instance_id') and
- ds.check_instance_id(self.cfg)):
+ if hasattr(ds, "check_instance_id") and ds.check_instance_id(
+ self.cfg
+ ):
return (ds, "restored from checked cache: %s" % ds)
else:
return (None, "cache invalid in datasource: %s" % ds)
- def _get_data_source(self, existing):
+ def _get_data_source(self, existing) -> sources.DataSource:
if self.datasource is not NULL_DATA_SOURCE:
return self.datasource
with events.ReportEventStack(
- name="check-cache",
- description="attempting to read from cache [%s]" % existing,
- parent=self.reporter) as myrep:
+ name="check-cache",
+ description="attempting to read from cache [%s]" % existing,
+ parent=self.reporter,
+ ) as myrep:
ds, desc = self._restore_from_checked_cache(existing)
myrep.description = desc
@@ -252,21 +320,24 @@ class Init(object):
(cfg_list, pkg_list) = self._get_datasources()
# Deep copy so that user-data handlers can not modify
# (which will affect user-data handlers down the line...)
- (ds, dsname) = sources.find_source(self.cfg,
- self.distro,
- self.paths,
- copy.deepcopy(self.ds_deps),
- cfg_list,
- pkg_list, self.reporter)
+ (ds, dsname) = sources.find_source(
+ self.cfg,
+ self.distro,
+ self.paths,
+ copy.deepcopy(self.ds_deps),
+ cfg_list,
+ pkg_list,
+ self.reporter,
+ )
LOG.info("Loaded datasource %s - %s", dsname, ds)
- self.datasource = ds
+ self.datasource = ds # type: sources.DataSource
# Ensure we adjust our path members datasource
# now that we have one (thus allowing ipath to be used)
self._reset()
return ds
def _get_instance_subdirs(self):
- return ['handlers', 'scripts', 'sem']
+ return ["handlers", "scripts", "sem"]
def _get_ipath(self, subname=None):
# Force a check to see if anything
@@ -274,8 +345,10 @@ class Init(object):
# then a datasource has not been assigned...
instance_dir = self.paths.get_ipath(subname)
if not instance_dir:
- raise RuntimeError(("No instance directory is available."
- " Has a datasource been fetched??"))
+ raise RuntimeError(
+ "No instance directory is available."
+ " Has a datasource been fetched??"
+ )
return instance_dir
def _reflect_cur_instance(self):
@@ -293,12 +366,12 @@ class Init(object):
# Write out information on what is being used for the current instance
# and what may have been used for a previous instance...
- dp = self.paths.get_cpath('data')
+ dp = self.paths.get_cpath("data")
# Write what the datasource was and is..
ds = "%s: %s" % (type_utils.obj_name(self.datasource), self.datasource)
previous_ds = None
- ds_fn = os.path.join(idir, 'datasource')
+ ds_fn = os.path.join(idir, "datasource")
try:
previous_ds = util.load_file(ds_fn).strip()
except Exception:
@@ -306,18 +379,20 @@ class Init(object):
if not previous_ds:
previous_ds = ds
util.write_file(ds_fn, "%s\n" % ds)
- util.write_file(os.path.join(dp, 'previous-datasource'),
- "%s\n" % (previous_ds))
+ util.write_file(
+ os.path.join(dp, "previous-datasource"), "%s\n" % (previous_ds)
+ )
# What the instance id was and is...
iid = self.datasource.get_instance_id()
- iid_fn = os.path.join(dp, 'instance-id')
+ iid_fn = os.path.join(dp, "instance-id")
previous_iid = self.previous_iid()
util.write_file(iid_fn, "%s\n" % iid)
- util.write_file(self.paths.get_runpath('instance_id'), "%s\n" % iid)
- util.write_file(os.path.join(dp, 'previous-instance-id'),
- "%s\n" % (previous_iid))
+ util.write_file(self.paths.get_runpath("instance_id"), "%s\n" % iid)
+ util.write_file(
+ os.path.join(dp, "previous-instance-id"), "%s\n" % (previous_iid)
+ )
self._write_to_cache()
# Ensure needed components are regenerated
@@ -330,8 +405,8 @@ class Init(object):
if self._previous_iid is not None:
return self._previous_iid
- dp = self.paths.get_cpath('data')
- iid_fn = os.path.join(dp, 'instance-id')
+ dp = self.paths.get_cpath("data")
+ iid_fn = os.path.join(dp, "instance-id")
try:
self._previous_iid = util.load_file(iid_fn).strip()
except Exception:
@@ -341,9 +416,16 @@ class Init(object):
return self._previous_iid
def is_new_instance(self):
+ """Return true if this is a new instance.
+
+ If datasource has already been initialized, this will return False,
+ even on first boot.
+ """
previous = self.previous_iid()
- ret = (previous == NO_PREVIOUS_INSTANCE_ID or
- previous != self.datasource.get_instance_id())
+ ret = (
+ previous == NO_PREVIOUS_INSTANCE_ID
+ or previous != self.datasource.get_instance_id()
+ )
return ret
def fetch(self, existing="check"):
@@ -354,75 +436,102 @@ class Init(object):
def cloudify(self):
# Form the needed options to cloudify our members
- return cloud.Cloud(self.datasource,
- self.paths, self.cfg,
- self.distro, helpers.Runners(self.paths),
- reporter=self.reporter)
+ return cloud.Cloud(
+ self.datasource,
+ self.paths,
+ self.cfg,
+ self.distro,
+ helpers.Runners(self.paths),
+ reporter=self.reporter,
+ )
def update(self):
- self._store_userdata()
- self._store_vendordata()
+ self._store_rawdata(self.datasource.get_userdata_raw(), "userdata")
+ self._store_processeddata(self.datasource.get_userdata(), "userdata")
+ self._store_raw_vendordata(
+ self.datasource.get_vendordata_raw(), "vendordata"
+ )
+ self._store_processeddata(
+ self.datasource.get_vendordata(), "vendordata"
+ )
+ self._store_raw_vendordata(
+ self.datasource.get_vendordata2_raw(), "vendordata2"
+ )
+ self._store_processeddata(
+ self.datasource.get_vendordata2(), "vendordata2"
+ )
def setup_datasource(self):
- with events.ReportEventStack("setup-datasource",
- "setting up datasource",
- parent=self.reporter):
+ with events.ReportEventStack(
+ "setup-datasource", "setting up datasource", parent=self.reporter
+ ):
if self.datasource is None:
raise RuntimeError("Datasource is None, cannot setup.")
self.datasource.setup(is_new_instance=self.is_new_instance())
def activate_datasource(self):
- with events.ReportEventStack("activate-datasource",
- "activating datasource",
- parent=self.reporter):
+ with events.ReportEventStack(
+ "activate-datasource",
+ "activating datasource",
+ parent=self.reporter,
+ ):
if self.datasource is None:
raise RuntimeError("Datasource is None, cannot activate.")
- self.datasource.activate(cfg=self.cfg,
- is_new_instance=self.is_new_instance())
+ self.datasource.activate(
+ cfg=self.cfg, is_new_instance=self.is_new_instance()
+ )
self._write_to_cache()
- def _store_userdata(self):
- raw_ud = self.datasource.get_userdata_raw()
- if raw_ud is None:
- raw_ud = b''
- util.write_file(self._get_ipath('userdata_raw'), raw_ud, 0o600)
- # processed userdata is a Mime message, so write it as string.
- processed_ud = self.datasource.get_userdata()
- if processed_ud is None:
- raw_ud = ''
- util.write_file(self._get_ipath('userdata'), str(processed_ud), 0o600)
-
- def _store_vendordata(self):
- raw_vd = self.datasource.get_vendordata_raw()
- if raw_vd is None:
- raw_vd = b''
- util.write_file(self._get_ipath('vendordata_raw'), raw_vd, 0o600)
- # processed vendor data is a Mime message, so write it as string.
- processed_vd = str(self.datasource.get_vendordata())
- if processed_vd is None:
- processed_vd = ''
- util.write_file(self._get_ipath('vendordata'), str(processed_vd),
- 0o600)
+ def _store_rawdata(self, data, datasource):
+ # Raw data is bytes, not a string
+ if data is None:
+ data = b""
+ util.write_file(self._get_ipath("%s_raw" % datasource), data, 0o600)
+
+ def _store_raw_vendordata(self, data, datasource):
+ # Only these data types
+ if data is not None and type(data) not in [bytes, str, list]:
+ raise TypeError(
+ "vendordata_raw is unsupported type '%s'" % str(type(data))
+ )
+ # This data may be a list, convert it to a string if so
+ if isinstance(data, list):
+ data = util.json_dumps(data)
+ self._store_rawdata(data, datasource)
+
+ def _store_processeddata(self, processed_data, datasource):
+ # processed is a Mime message, so write as string.
+ if processed_data is None:
+ processed_data = ""
+ util.write_file(
+ self._get_ipath(datasource), str(processed_data), 0o600
+ )
def _default_handlers(self, opts=None):
if opts is None:
opts = {}
- opts.update({
- 'paths': self.paths,
- 'datasource': self.datasource,
- })
+ opts.update(
+ {
+ "paths": self.paths,
+ "datasource": self.datasource,
+ }
+ )
# TODO(harlowja) Hmmm, should we dynamically import these??
cloudconfig_handler = CloudConfigPartHandler(**opts)
shellscript_handler = ShellScriptPartHandler(**opts)
def_handlers = [
cloudconfig_handler,
shellscript_handler,
+ ShellScriptByFreqPartHandler(PER_ALWAYS, **opts),
+ ShellScriptByFreqPartHandler(PER_INSTANCE, **opts),
+ ShellScriptByFreqPartHandler(PER_ONCE, **opts),
BootHookPartHandler(**opts),
UpstartJobPartHandler(**opts),
]
opts.update(
- {'sub_handlers': [cloudconfig_handler, shellscript_handler]})
+ {"sub_handlers": [cloudconfig_handler, shellscript_handler]}
+ )
def_handlers.append(JinjaTemplatePartHandler(**opts))
return def_handlers
@@ -431,11 +540,23 @@ class Init(object):
def _default_vendordata_handlers(self):
return self._default_handlers(
- opts={'script_path': 'vendor_scripts',
- 'cloud_config_path': 'vendor_cloud_config'})
+ opts={
+ "script_path": "vendor_scripts",
+ "cloud_config_path": "vendor_cloud_config",
+ }
+ )
+
+ def _default_vendordata2_handlers(self):
+ return self._default_handlers(
+ opts={
+ "script_path": "vendor_scripts",
+ "cloud_config_path": "vendor2_cloud_config",
+ }
+ )
- def _do_handlers(self, data_msg, c_handlers_list, frequency,
- excluded=None):
+ def _do_handlers(
+ self, data_msg, c_handlers_list, frequency, excluded=None
+ ):
"""
Generalized handlers suitable for use with either vendordata
or userdata
@@ -462,21 +583,31 @@ class Init(object):
for (fname, mod_name) in potential_handlers.items():
try:
mod_locs, looked_locs = importer.find_module(
- mod_name, [''], ['list_types', 'handle_part'])
+ mod_name, [""], ["list_types", "handle_part"]
+ )
if not mod_locs:
- LOG.warning("Could not find a valid user-data handler"
- " named %s in file %s (searched %s)",
- mod_name, fname, looked_locs)
+ LOG.warning(
+ "Could not find a valid user-data handler"
+ " named %s in file %s (searched %s)",
+ mod_name,
+ fname,
+ looked_locs,
+ )
continue
mod = importer.import_module(mod_locs[0])
mod = handlers.fixup_handler(mod)
types = c_handlers.register(mod)
if types:
- LOG.debug("Added custom handler for %s [%s] from %s",
- types, mod, fname)
+ LOG.debug(
+ "Added custom handler for %s [%s] from %s",
+ types,
+ mod,
+ fname,
+ )
except Exception:
- util.logexc(LOG, "Failed to register handler from %s",
- fname)
+ util.logexc(
+ LOG, "Failed to register handler from %s", fname
+ )
# This keeps track of all the active handlers
c_handlers = helpers.ContentHandlers()
@@ -508,17 +639,17 @@ class Init(object):
def walk_handlers(excluded):
# Walk the user data
part_data = {
- 'handlers': c_handlers,
+ "handlers": c_handlers,
# Any new handlers that are encountered get writen here
- 'handlerdir': idir,
- 'data': data,
+ "handlerdir": idir,
+ "data": data,
# The default frequency if handlers don't have one
- 'frequency': frequency,
+ "frequency": frequency,
# This will be used when new handlers are found
# to help write their contents to files with numbered
# names...
- 'handlercount': 0,
- 'excluded': excluded,
+ "handlercount": 0,
+ "excluded": excluded,
}
handlers.walk(data_msg, handlers.walker_callback, data=part_data)
@@ -544,18 +675,29 @@ class Init(object):
def consume_data(self, frequency=PER_INSTANCE):
# Consume the userdata first, because we need want to let the part
# handlers run first (for merging stuff)
- with events.ReportEventStack("consume-user-data",
- "reading and applying user-data",
- parent=self.reporter):
- if util.get_cfg_option_bool(self.cfg, 'allow_userdata', True):
+ with events.ReportEventStack(
+ "consume-user-data",
+ "reading and applying user-data",
+ parent=self.reporter,
+ ):
+ if util.get_cfg_option_bool(self.cfg, "allow_userdata", True):
self._consume_userdata(frequency)
else:
- LOG.debug('allow_userdata = False: discarding user-data')
+ LOG.debug("allow_userdata = False: discarding user-data")
+
+ with events.ReportEventStack(
+ "consume-vendor-data",
+ "reading and applying vendor-data",
+ parent=self.reporter,
+ ):
+ self._consume_vendordata("vendordata", frequency)
- with events.ReportEventStack("consume-vendor-data",
- "reading and applying vendor-data",
- parent=self.reporter):
- self._consume_vendordata(frequency)
+ with events.ReportEventStack(
+ "consume-vendor-data2",
+ "reading and applying vendor-data2",
+ parent=self.reporter,
+ ):
+ self._consume_vendordata("vendordata2", frequency)
# Perform post-consumption adjustments so that
# modules that run during the init stage reflect
@@ -568,50 +710,75 @@ class Init(object):
# objects before the load of the userdata happened,
# this is expected.
- def _consume_vendordata(self, frequency=PER_INSTANCE):
+ def _consume_vendordata(self, vendor_source, frequency=PER_INSTANCE):
"""
Consume the vendordata and run the part handlers on it
"""
+
# User-data should have been consumed first.
# So we merge the other available cloud-configs (everything except
# vendor provided), and check whether or not we should consume
# vendor data at all. That gives user or system a chance to override.
- if not self.datasource.get_vendordata_raw():
- LOG.debug("no vendordata from datasource")
- return
-
- _cc_merger = helpers.ConfigMerger(paths=self._paths,
- datasource=self.datasource,
- additional_fns=[],
- base_cfg=self.cfg,
- include_vendor=False)
- vdcfg = _cc_merger.cfg.get('vendor_data', {})
+ if vendor_source == "vendordata":
+ if not self.datasource.get_vendordata_raw():
+ LOG.debug("no vendordata from datasource")
+ return
+ cfg_name = "vendor_data"
+ elif vendor_source == "vendordata2":
+ if not self.datasource.get_vendordata2_raw():
+ LOG.debug("no vendordata2 from datasource")
+ return
+ cfg_name = "vendor_data2"
+ else:
+ raise RuntimeError(
+ "vendor_source arg must be either 'vendordata'"
+ " or 'vendordata2'"
+ )
+
+ _cc_merger = helpers.ConfigMerger(
+ paths=self._paths,
+ datasource=self.datasource,
+ additional_fns=[],
+ base_cfg=self.cfg,
+ include_vendor=False,
+ )
+ vdcfg = _cc_merger.cfg.get(cfg_name, {})
if not isinstance(vdcfg, dict):
- vdcfg = {'enabled': False}
- LOG.warning("invalid 'vendor_data' setting. resetting to: %s",
- vdcfg)
+ vdcfg = {"enabled": False}
+ LOG.warning(
+ "invalid %s setting. resetting to: %s", cfg_name, vdcfg
+ )
- enabled = vdcfg.get('enabled')
- no_handlers = vdcfg.get('disabled_handlers', None)
+ enabled = vdcfg.get("enabled")
+ no_handlers = vdcfg.get("disabled_handlers", None)
if not util.is_true(enabled):
- LOG.debug("vendordata consumption is disabled.")
+ LOG.debug("%s consumption is disabled.", vendor_source)
return
- LOG.debug("vendor data will be consumed. disabled_handlers=%s",
- no_handlers)
+ LOG.debug(
+ "%s will be consumed. disabled_handlers=%s",
+ vendor_source,
+ no_handlers,
+ )
- # Ensure vendordata source fetched before activation (just incase)
- vendor_data_msg = self.datasource.get_vendordata()
+ # Ensure vendordata source fetched before activation (just in case.)
- # This keeps track of all the active handlers, while excluding what the
- # users doesn't want run, i.e. boot_hook, cloud_config, shell_script
- c_handlers_list = self._default_vendordata_handlers()
+ # c_handlers_list keeps track of all the active handlers, while
+ # excluding what the users doesn't want run, i.e. boot_hook,
+ # cloud_config, shell_script
+ if vendor_source == "vendordata":
+ vendor_data_msg = self.datasource.get_vendordata()
+ c_handlers_list = self._default_vendordata_handlers()
+ else:
+ vendor_data_msg = self.datasource.get_vendordata2()
+ c_handlers_list = self._default_vendordata2_handlers()
# Run the handlers
- self._do_handlers(vendor_data_msg, c_handlers_list, frequency,
- excluded=no_handlers)
+ self._do_handlers(
+ vendor_data_msg, c_handlers_list, frequency, excluded=no_handlers
+ )
def _consume_userdata(self, frequency=PER_INSTANCE):
"""
@@ -629,7 +796,8 @@ class Init(object):
def _find_networking_config(self):
disable_file = os.path.join(
- self.paths.get_cpath('data'), 'upgraded-network')
+ self.paths.get_cpath("data"), "upgraded-network"
+ )
if os.path.exists(disable_file):
return (None, disable_file)
@@ -637,12 +805,13 @@ class Init(object):
NetworkConfigSource.cmdline: cmdline.read_kernel_cmdline_config(),
NetworkConfigSource.initramfs: cmdline.read_initramfs_config(),
NetworkConfigSource.ds: None,
- NetworkConfigSource.system_cfg: self.cfg.get('network'),
+ NetworkConfigSource.system_cfg: self.cfg.get("network"),
}
- if self.datasource and hasattr(self.datasource, 'network_config'):
- available_cfgs[NetworkConfigSource.ds] = (
- self.datasource.network_config)
+ if self.datasource and hasattr(self.datasource, "network_config"):
+ available_cfgs[
+ NetworkConfigSource.ds
+ ] = self.datasource.network_config
if self.datasource:
order = self.datasource.network_config_sources
@@ -650,12 +819,17 @@ class Init(object):
order = sources.DataSource.network_config_sources
for cfg_source in order:
if not hasattr(NetworkConfigSource, cfg_source):
- LOG.warning('data source specifies an invalid network'
- ' cfg_source: %s', cfg_source)
+ LOG.warning(
+ "data source specifies an invalid network cfg_source: %s",
+ cfg_source,
+ )
continue
if cfg_source not in available_cfgs:
- LOG.warning('data source specifies an unavailable network'
- ' cfg_source: %s', cfg_source)
+ LOG.warning(
+ "data source specifies an unavailable network"
+ " cfg_source: %s",
+ cfg_source,
+ )
continue
ncfg = available_cfgs[cfg_source]
if net.is_disabled_cfg(ncfg):
@@ -663,8 +837,10 @@ class Init(object):
return (None, cfg_source)
if ncfg:
return (ncfg, cfg_source)
- return (self.distro.generate_fallback_config(),
- NetworkConfigSource.fallback)
+ return (
+ self.distro.generate_fallback_config(),
+ NetworkConfigSource.fallback,
+ )
def _apply_netcfg_names(self, netcfg):
try:
@@ -673,27 +849,60 @@ class Init(object):
except Exception as e:
LOG.warning("Failed to rename devices: %s", e)
+ def _get_per_boot_network_semaphore(self):
+ return namedtuple("Semaphore", "semaphore args")(
+ helpers.FileSemaphores(self.paths.get_runpath("sem")),
+ ("apply_network_config", PER_ONCE),
+ )
+
+ def _network_already_configured(self) -> bool:
+ sem = self._get_per_boot_network_semaphore()
+ return sem.semaphore.has_run(*sem.args)
+
def apply_network_config(self, bring_up):
- # get a network config
+ """Apply the network config.
+
+ Find the config, determine whether to apply it, apply it via
+ the distro, and optionally bring it up
+ """
netcfg, src = self._find_networking_config()
if netcfg is None:
LOG.info("network config is disabled by %s", src)
return
- # request an update if needed/available
- if self.datasource is not NULL_DATA_SOURCE:
- if not self.is_new_instance():
- if not self.datasource.update_metadata([EventType.BOOT]):
- LOG.debug(
- "No network config applied. Neither a new instance"
- " nor datasource network update on '%s' event",
- EventType.BOOT)
- # nothing new, but ensure proper names
- self._apply_netcfg_names(netcfg)
- return
- else:
- # refresh netcfg after update
- netcfg, src = self._find_networking_config()
+ def event_enabled_and_metadata_updated(event_type):
+ return (
+ update_event_enabled(
+ datasource=self.datasource,
+ cfg=self.cfg,
+ event_source_type=event_type,
+ scope=EventScope.NETWORK,
+ )
+ and self.datasource.update_metadata_if_supported([event_type])
+ )
+
+ def should_run_on_boot_event():
+ return (
+ not self._network_already_configured()
+ and event_enabled_and_metadata_updated(EventType.BOOT)
+ )
+
+ if (
+ self.datasource is not NULL_DATA_SOURCE
+ and not self.is_new_instance()
+ and not should_run_on_boot_event()
+ and not event_enabled_and_metadata_updated(EventType.BOOT_LEGACY)
+ ):
+ LOG.debug(
+ "No network config applied. Neither a new instance"
+ " nor datasource network update allowed"
+ )
+ # nothing new, but ensure proper names
+ self._apply_netcfg_names(netcfg)
+ return
+
+ # refresh netcfg after update
+ netcfg, src = self._find_networking_config()
# ensure all physical devices in config are present
self.distro.networking.wait_for_physdevs(netcfg)
@@ -702,18 +911,32 @@ class Init(object):
self._apply_netcfg_names(netcfg)
# rendering config
- LOG.info("Applying network configuration from %s bringup=%s: %s",
- src, bring_up, netcfg)
+ LOG.info(
+ "Applying network configuration from %s bringup=%s: %s",
+ src,
+ bring_up,
+ netcfg,
+ )
+
+ sem = self._get_per_boot_network_semaphore()
try:
- return self.distro.apply_network_config(netcfg, bring_up=bring_up)
+ with sem.semaphore.lock(*sem.args):
+ return self.distro.apply_network_config(
+ netcfg, bring_up=bring_up
+ )
except net.RendererNotFoundError as e:
- LOG.error("Unable to render networking. Network config is "
- "likely broken: %s", e)
+ LOG.error(
+ "Unable to render networking. Network config is "
+ "likely broken: %s",
+ e,
+ )
return
except NotImplementedError:
- LOG.warning("distro '%s' does not implement apply_network_config. "
- "networking may not be configured properly.",
- self.distro)
+ LOG.warning(
+ "distro '%s' does not implement apply_network_config. "
+ "networking may not be configured properly.",
+ self.distro,
+ )
return
@@ -725,18 +948,22 @@ class Modules(object):
self._cached_cfg = None
if reporter is None:
reporter = events.ReportEventStack(
- name="module-reporter", description="module-desc",
- reporting_enabled=False)
+ name="module-reporter",
+ description="module-desc",
+ reporting_enabled=False,
+ )
self.reporter = reporter
@property
def cfg(self):
# None check to avoid empty case causing re-reading
if self._cached_cfg is None:
- merger = helpers.ConfigMerger(paths=self.init.paths,
- datasource=self.init.datasource,
- additional_fns=self.cfg_files,
- base_cfg=self.init.cfg)
+ merger = helpers.ConfigMerger(
+ paths=self.init.paths,
+ datasource=self.init.datasource,
+ additional_fns=self.cfg_files,
+ base_cfg=self.init.cfg,
+ )
self._cached_cfg = merger.cfg
# LOG.debug("Loading 'module' config %s", self._cached_cfg)
# Only give out a copy so that others can't modify this...
@@ -757,57 +984,67 @@ class Modules(object):
if not item:
continue
if isinstance(item, str):
- module_list.append({
- 'mod': item.strip(),
- })
+ module_list.append(
+ {
+ "mod": item.strip(),
+ }
+ )
elif isinstance(item, (list)):
contents = {}
# Meant to fall through...
if len(item) >= 1:
- contents['mod'] = item[0].strip()
+ contents["mod"] = item[0].strip()
if len(item) >= 2:
- contents['freq'] = item[1].strip()
+ contents["freq"] = item[1].strip()
if len(item) >= 3:
- contents['args'] = item[2:]
+ contents["args"] = item[2:]
if contents:
module_list.append(contents)
elif isinstance(item, (dict)):
contents = {}
valid = False
- if 'name' in item:
- contents['mod'] = item['name'].strip()
+ if "name" in item:
+ contents["mod"] = item["name"].strip()
valid = True
- if 'frequency' in item:
- contents['freq'] = item['frequency'].strip()
- if 'args' in item:
- contents['args'] = item['args'] or []
+ if "frequency" in item:
+ contents["freq"] = item["frequency"].strip()
+ if "args" in item:
+ contents["args"] = item["args"] or []
if contents and valid:
module_list.append(contents)
else:
- raise TypeError(("Failed to read '%s' item in config,"
- " unknown type %s") %
- (item, type_utils.obj_name(item)))
+ raise TypeError(
+ "Failed to read '%s' item in config, unknown type %s"
+ % (item, type_utils.obj_name(item))
+ )
return module_list
def _fixup_modules(self, raw_mods):
mostly_mods = []
for raw_mod in raw_mods:
- raw_name = raw_mod['mod']
- freq = raw_mod.get('freq')
- run_args = raw_mod.get('args') or []
+ raw_name = raw_mod["mod"]
+ freq = raw_mod.get("freq")
+ run_args = raw_mod.get("args") or []
mod_name = config.form_module_name(raw_name)
if not mod_name:
continue
if freq and freq not in FREQUENCIES:
- LOG.warning(("Config specified module %s"
- " has an unknown frequency %s"), raw_name, freq)
+ LOG.warning(
+ "Config specified module %s has an unknown frequency %s",
+ raw_name,
+ freq,
+ )
# Reset it so when ran it will get set to a known value
freq = None
mod_locs, looked_locs = importer.find_module(
- mod_name, ['', type_utils.obj_name(config)], ['handle'])
+ mod_name, ["", type_utils.obj_name(config)], ["handle"]
+ )
if not mod_locs:
- LOG.warning("Could not find module named %s (searched %s)",
- mod_name, looked_locs)
+ LOG.warning(
+ "Could not find module named %s (searched %s)",
+ mod_name,
+ looked_locs,
+ )
continue
mod = config.fixup_module(importer.import_module(mod_locs[0]))
mostly_mods.append([mod, raw_name, freq, run_args])
@@ -826,15 +1063,15 @@ class Modules(object):
freq = mod.frequency
if freq not in FREQUENCIES:
freq = PER_INSTANCE
- LOG.debug("Running module %s (%s) with frequency %s",
- name, mod, freq)
+ LOG.debug(
+ "Running module %s (%s) with frequency %s", name, mod, freq
+ )
# Use the configs logger and not our own
# TODO(harlowja): possibly check the module
# for having a LOG attr and just give it back
# its own logger?
- func_args = [name, self.cfg,
- cc, config.LOG, args]
+ func_args = [name, self.cfg, cc, config.LOG, args]
# Mark it as having started running
which_ran.append(name)
# This name will affect the semaphore name created
@@ -842,11 +1079,13 @@ class Modules(object):
desc = "running %s with frequency %s" % (run_name, freq)
myrep = events.ReportEventStack(
- name=run_name, description=desc, parent=self.reporter)
+ name=run_name, description=desc, parent=self.reporter
+ )
with myrep:
- ran, _r = cc.run(run_name, mod.handle, func_args,
- freq=freq)
+ ran, _r = cc.run(
+ run_name, mod.handle, func_args, freq=freq
+ )
if ran:
myrep.message = "%s ran successfully" % run_name
else:
@@ -860,9 +1099,9 @@ class Modules(object):
def run_single(self, mod_name, args=None, freq=None):
# Form the users module 'specs'
mod_to_be = {
- 'mod': mod_name,
- 'args': args,
- 'freq': freq,
+ "mod": mod_name,
+ "args": args,
+ "freq": freq,
}
# Now resume doing the normal fixups and running
raw_mods = [mod_to_be]
@@ -876,13 +1115,14 @@ class Modules(object):
skipped = []
forced = []
- overridden = self.cfg.get('unverified_modules', [])
+ overridden = self.cfg.get("unverified_modules", [])
active_mods = []
all_distros = set([distros.ALL_DISTROS])
for (mod, name, _freq, _args) in mostly_mods:
worked_distros = set(mod.distros) # Minimally [] per fixup_modules
worked_distros.update(
- distros.Distro.expand_osfamily(mod.osfamilies))
+ distros.Distro.expand_osfamily(mod.osfamilies)
+ )
# Skip only when the following conditions are all met:
# - distros are defined in the module != ALL_DISTROS
@@ -898,12 +1138,15 @@ class Modules(object):
active_mods.append([mod, name, _freq, _args])
if skipped:
- LOG.info("Skipping modules '%s' because they are not verified "
- "on distro '%s'. To run anyway, add them to "
- "'unverified_modules' in config.",
- ','.join(skipped), d_name)
+ LOG.info(
+ "Skipping modules '%s' because they are not verified "
+ "on distro '%s'. To run anyway, add them to "
+ "'unverified_modules' in config.",
+ ",".join(skipped),
+ d_name,
+ )
if forced:
- LOG.info("running unverified_modules: '%s'", ', '.join(forced))
+ LOG.info("running unverified_modules: '%s'", ", ".join(forced))
return self._run_modules(active_mods)
@@ -923,7 +1166,9 @@ def fetch_base_config():
read_runtime_config(),
# Kernel/cmdline parameters override system config
util.read_conf_from_cmdline(),
- ], reverse=True)
+ ],
+ reverse=True,
+ )
def _pkl_store(obj, fname):
@@ -953,8 +1198,11 @@ def _pkl_load(fname):
return None
try:
return pickle.loads(pickle_contents)
+ except sources.DatasourceUnpickleUserDataError:
+ return None
except Exception:
util.logexc(LOG, "Failed loading pickled blob from %s", fname)
return None
+
# vi: ts=4 expandtab