diff options
Diffstat (limited to 'cloudinit/distros/ug_util.py')
-rwxr-xr-x | cloudinit/distros/ug_util.py | 290 |
1 files changed, 127 insertions, 163 deletions
diff --git a/cloudinit/distros/ug_util.py b/cloudinit/distros/ug_util.py index 08446a95..72766392 100755 --- a/cloudinit/distros/ug_util.py +++ b/cloudinit/distros/ug_util.py @@ -10,92 +10,80 @@ # This file is part of cloud-init. See LICENSE file for license information. from cloudinit import log as logging -from cloudinit import type_utils -from cloudinit import util +from cloudinit import type_utils, util LOG = logging.getLogger(__name__) -# Normalizes a input group configuration -# which can be a comma seperated list of -# group names, or a list of group names -# or a python dictionary of group names -# to a list of members of that group. +# Normalizes an input group configuration which can be: +# Comma seperated string or a list or a dictionary # -# The output is a dictionary of group -# names => members of that group which -# is the standard form used in the rest -# of cloud-init +# Returns dictionary of group names => members of that group which is the +# standard form used in the rest of cloud-init def _normalize_groups(grp_cfg): if isinstance(grp_cfg, str): grp_cfg = grp_cfg.strip().split(",") + if isinstance(grp_cfg, list): c_grp_cfg = {} for i in grp_cfg: if isinstance(i, dict): for k, v in i.items(): - if k not in c_grp_cfg: - if isinstance(v, list): - c_grp_cfg[k] = list(v) - elif isinstance(v, str): - c_grp_cfg[k] = [v] - else: - raise TypeError("Bad group member type %s" % - type_utils.obj_name(v)) + if not isinstance(v, (list, str)): + raise TypeError( + "Bad group member type %s" + % (type_utils.obj_name(v)) + ) + + if isinstance(v, list): + c_grp_cfg.setdefault(k, []).extend(v) else: - if isinstance(v, list): - c_grp_cfg[k].extend(v) - elif isinstance(v, str): - c_grp_cfg[k].append(v) - else: - raise TypeError("Bad group member type %s" % - type_utils.obj_name(v)) + c_grp_cfg.setdefault(k, []).append(v) elif isinstance(i, str): if i not in c_grp_cfg: c_grp_cfg[i] = [] else: - raise TypeError("Unknown group name type %s" % - type_utils.obj_name(i)) + raise TypeError( + "Unknown group name type %s" % (type_utils.obj_name(i)) + ) grp_cfg = c_grp_cfg + groups = {} if isinstance(grp_cfg, dict): - for (grp_name, grp_members) in grp_cfg.items(): + for grp_name, grp_members in grp_cfg.items(): groups[grp_name] = util.uniq_merge_sorted(grp_members) else: - raise TypeError(("Group config must be list, dict " - " or string types only and not %s") % - type_utils.obj_name(grp_cfg)) + raise TypeError( + "Group config must be list, dict or string type only but found %s" + % (type_utils.obj_name(grp_cfg)) + ) return groups -# Normalizes a input group configuration -# which can be a comma seperated list of -# user names, or a list of string user names -# or a list of dictionaries with components -# that define the user config + 'name' (if -# a 'name' field does not exist then the -# default user is assumed to 'own' that -# configuration. +# Normalizes an input group configuration which can be: a list or a dictionary +# +# components that define the user config + 'name' (if a 'name' field does not +# exist then the default user is assumed to 'own' that configuration.) # -# The output is a dictionary of user -# names => user config which is the standard -# form used in the rest of cloud-init. Note -# the default user will have a special config -# entry 'default' which will be marked as true -# all other users will be marked as false. +# Returns a dictionary of user names => user config which is the standard form +# used in the rest of cloud-init. Note the default user will have a special +# config entry 'default' which will be marked true and all other users will be +# marked false. def _normalize_users(u_cfg, def_user_cfg=None): if isinstance(u_cfg, dict): ad_ucfg = [] - for (k, v) in u_cfg.items(): + for k, v in u_cfg.items(): if isinstance(v, (bool, int, float, str)): if util.is_true(v): ad_ucfg.append(str(k)) elif isinstance(v, dict): - v['name'] = k + v["name"] = k ad_ucfg.append(v) else: - raise TypeError(("Unmappable user value type %s" - " for key %s") % (type_utils.obj_name(v), k)) + raise TypeError( + "Unmappable user value type %s for key %s" + % (type_utils.obj_name(v), k) + ) u_cfg = ad_ucfg elif isinstance(u_cfg, str): u_cfg = util.uniq_merge_sorted(u_cfg) @@ -107,181 +95,157 @@ def _normalize_users(u_cfg, def_user_cfg=None): if u and u not in users: users[u] = {} elif isinstance(user_config, dict): - if 'name' in user_config: - n = user_config.pop('name') - prev_config = users.get(n) or {} - users[n] = util.mergemanydict([prev_config, - user_config]) - else: - # Assume the default user then - prev_config = users.get('default') or {} - users['default'] = util.mergemanydict([prev_config, - user_config]) + n = user_config.pop("name", "default") + prev_config = users.get(n) or {} + users[n] = util.mergemanydict([prev_config, user_config]) else: - raise TypeError(("User config must be dictionary/list " - " or string types only and not %s") % - type_utils.obj_name(user_config)) + raise TypeError( + "User config must be dictionary/list or string " + " types only and not %s" % (type_utils.obj_name(user_config)) + ) # Ensure user options are in the right python friendly format if users: c_users = {} - for (uname, uconfig) in users.items(): + for uname, uconfig in users.items(): c_uconfig = {} - for (k, v) in uconfig.items(): - k = k.replace('-', '_').strip() + for k, v in uconfig.items(): + k = k.replace("-", "_").strip() if k: c_uconfig[k] = v c_users[uname] = c_uconfig users = c_users - # Fixup the default user into the real - # default user name and replace it... + # Fix the default user into the actual default user name and replace it. def_user = None - if users and 'default' in users: - def_config = users.pop('default') + if users and "default" in users: + def_config = users.pop("default") if def_user_cfg: - # Pickup what the default 'real name' is - # and any groups that are provided by the - # default config + # Pickup what the default 'real name' is and any groups that are + # provided by the default config def_user_cfg = def_user_cfg.copy() - def_user = def_user_cfg.pop('name') - def_groups = def_user_cfg.pop('groups', []) - # Pickup any config + groups for that user name - # that we may have previously extracted + def_user = def_user_cfg.pop("name") + def_groups = def_user_cfg.pop("groups", []) + # Pick any config + groups for the user name that we may have + # extracted previously parsed_config = users.pop(def_user, {}) - parsed_groups = parsed_config.get('groups', []) - # Now merge our extracted groups with - # anything the default config provided + parsed_groups = parsed_config.get("groups", []) + # Now merge the extracted groups with the default config provided users_groups = util.uniq_merge_sorted(parsed_groups, def_groups) - parsed_config['groups'] = ",".join(users_groups) - # The real config for the default user is the - # combination of the default user config provided - # by the distro, the default user config provided - # by the above merging for the user 'default' and - # then the parsed config from the user's 'real name' - # which does not have to be 'default' (but could be) - users[def_user] = util.mergemanydict([def_user_cfg, - def_config, - parsed_config]) - - # Ensure that only the default user that we - # found (if any) is actually marked as being - # the default user - if users: - for (uname, uconfig) in users.items(): - if def_user and uname == def_user: - uconfig['default'] = True - else: - uconfig['default'] = False + parsed_config["groups"] = ",".join(users_groups) + # The real config for the default user is the combination of the + # default user config provided by the distro, the default user + # config provided by the above merging for the user 'default' and + # then the parsed config from the user's 'real name' which does not + # have to be 'default' (but could be) + users[def_user] = util.mergemanydict( + [def_user_cfg, def_config, parsed_config] + ) + + # Ensure that only the default user that we found (if any) is actually + # marked as the default user + for uname, uconfig in users.items(): + uconfig["default"] = uname == def_user if def_user else False return users -# Normalizes a set of user/users and group -# dictionary configuration into a useable -# format that the rest of cloud-init can -# understand using the default user -# provided by the input distrobution (if any) -# to allow for mapping of the 'default' user. +# Normalizes a set of user/users and group dictionary configuration into an +# usable format so that the rest of cloud-init can understand using the default +# user provided by the input distribution (if any) to allow mapping of the +# 'default' user. # # Output is a dictionary of group names -> [member] (list) # and a dictionary of user names -> user configuration (dict) # -# If 'user' exists it will override -# the 'users'[0] entry (if a list) otherwise it will -# just become an entry in the returned dictionary (no override) +# If 'user' exists, it will override +# The 'users'[0] entry (if a list) otherwise it will just become an entry in +# the returned dictionary (no override) def normalize_users_groups(cfg, distro): if not cfg: cfg = {} - users = {} - groups = {} - if 'groups' in cfg: - groups = _normalize_groups(cfg['groups']) - # Handle the previous style of doing this where the first user # overrides the concept of the default user if provided in the user: XYZ # format. old_user = {} - if 'user' in cfg and cfg['user']: - old_user = cfg['user'] - # Translate it into the format that is more useful - # going forward + if "user" in cfg and cfg["user"]: + old_user = cfg["user"] + # Translate it into a format that will be more useful going forward if isinstance(old_user, str): - old_user = { - 'name': old_user, - } - if not isinstance(old_user, dict): - LOG.warning(("Format for 'user' key must be a string or dictionary" - " and not %s"), type_utils.obj_name(old_user)) + old_user = {"name": old_user} + elif not isinstance(old_user, dict): + LOG.warning( + "Format for 'user' key must be a string or dictionary" + " and not %s", + type_utils.obj_name(old_user), + ) old_user = {} - # If no old user format, then assume the distro - # provides what the 'default' user maps to, but notice - # that if this is provided, we won't automatically inject - # a 'default' user into the users list, while if a old user - # format is provided we will. + # If no old user format, then assume the distro provides what the 'default' + # user maps to, but notice that if this is provided, we won't automatically + # inject a 'default' user into the users list, while if an old user format + # is provided we will. distro_user_config = {} try: distro_user_config = distro.get_default_user() except NotImplementedError: - LOG.warning(("Distro has not implemented default user " - "access. No distribution provided default user" - " will be normalized.")) - - # Merge the old user (which may just be an empty dict when not - # present with the distro provided default user configuration so - # that the old user style picks up all the distribution specific - # attributes (if any) + LOG.warning( + "Distro has not implemented default user access. No " + "distribution provided default user will be normalized." + ) + + # Merge the old user (which may just be an empty dict when not present) + # with the distro provided default user configuration so that the old user + # style picks up all the distribution specific attributes (if any) default_user_config = util.mergemanydict([old_user, distro_user_config]) - base_users = cfg.get('users', []) + base_users = cfg.get("users", []) if not isinstance(base_users, (list, dict, str)): - LOG.warning(("Format for 'users' key must be a comma separated string" - " or a dictionary or a list and not %s"), - type_utils.obj_name(base_users)) + LOG.warning( + "Format for 'users' key must be a comma separated string" + " or a dictionary or a list but found %s", + type_utils.obj_name(base_users), + ) base_users = [] if old_user: - # Ensure that when user: is provided that this user - # always gets added (as the default user) + # When 'user:' is provided, it should be made as the default user if isinstance(base_users, list): - # Just add it on at the end... - base_users.append({'name': 'default'}) + base_users.append({"name": "default"}) elif isinstance(base_users, dict): - base_users['default'] = dict(base_users).get('default', True) + base_users["default"] = dict(base_users).get("default", True) elif isinstance(base_users, str): - # Just append it on to be re-parsed later base_users += ",default" + groups = {} + if "groups" in cfg: + groups = _normalize_groups(cfg["groups"]) + users = _normalize_users(base_users, default_user_config) return (users, groups) -# Given a user dictionary config it will -# extract the default user name and user config -# from that list and return that tuple or -# return (None, None) if no default user is -# found in the given input +# Given a user dictionary config, extract the default user name and user config +# and return them or return (None, None) if no default user is found def extract_default(users, default_name=None, default_config=None): if not users: - users = {} + return (default_name, default_config) def safe_find(entry): config = entry[1] - if not config or 'default' not in config: + if not config or "default" not in config: return False - else: - return config['default'] + return config["default"] - tmp_users = users.items() - tmp_users = dict(filter(safe_find, tmp_users)) + tmp_users = dict(filter(safe_find, users.items())) if not tmp_users: return (default_name, default_config) - else: - name = list(tmp_users)[0] - config = tmp_users[name] - config.pop('default', None) - return (name, config) + + name = list(tmp_users)[0] + config = tmp_users[name] + config.pop("default", None) + return (name, config) + # vi: ts=4 expandtab |