From f3cc94949d9f153b4a5135f8b989ff11b36ab7ea Mon Sep 17 00:00:00 2001 From: Shreenidhi Shedi <53473811+sshedi@users.noreply.github.com> Date: Tue, 14 Sep 2021 01:11:45 +0530 Subject: Improve ug_util.py (#1013) No functional changes. --- cloudinit/distros/ug_util.py | 234 +++++++++++++++++-------------------------- 1 file changed, 91 insertions(+), 143 deletions(-) (limited to 'cloudinit/distros') diff --git a/cloudinit/distros/ug_util.py b/cloudinit/distros/ug_util.py index 08446a95..600b743f 100755 --- a/cloudinit/distros/ug_util.py +++ b/cloudinit/distros/ug_util.py @@ -16,77 +16,59 @@ from cloudinit import util LOG = logging.getLogger(__name__) -# Normalizes a input group configuration -# which can be a comma seperated list of -# group names, or a list of group names -# or a python dictionary of group names -# to a list of members of that group. +# Normalizes an input group configuration which can be: +# Comma seperated string or a list or a dictionary # -# The output is a dictionary of group -# names => members of that group which -# is the standard form used in the rest -# of cloud-init +# Returns dictionary of group names => members of that group which is the +# standard form used in the rest of cloud-init def _normalize_groups(grp_cfg): if isinstance(grp_cfg, str): - grp_cfg = grp_cfg.strip().split(",") + grp_cfg = grp_cfg.strip().split(',') + if isinstance(grp_cfg, list): c_grp_cfg = {} for i in grp_cfg: if isinstance(i, dict): for k, v in i.items(): - if k not in c_grp_cfg: - if isinstance(v, list): - c_grp_cfg[k] = list(v) - elif isinstance(v, str): - c_grp_cfg[k] = [v] - else: - raise TypeError("Bad group member type %s" % - type_utils.obj_name(v)) + if not isinstance(v, (list, str)): + raise TypeError('Bad group member type %s' + % (type_utils.obj_name(v))) + + if isinstance(v, list): + c_grp_cfg.setdefault(k, []).extend(v) else: - if isinstance(v, list): - c_grp_cfg[k].extend(v) - elif isinstance(v, str): - c_grp_cfg[k].append(v) - else: - raise TypeError("Bad group member type %s" % - type_utils.obj_name(v)) + c_grp_cfg.setdefault(k, []).append(v) elif isinstance(i, str): if i not in c_grp_cfg: c_grp_cfg[i] = [] else: - raise TypeError("Unknown group name type %s" % - type_utils.obj_name(i)) + raise TypeError('Unknown group name type %s' + % (type_utils.obj_name(i))) grp_cfg = c_grp_cfg + groups = {} if isinstance(grp_cfg, dict): - for (grp_name, grp_members) in grp_cfg.items(): + for grp_name, grp_members in grp_cfg.items(): groups[grp_name] = util.uniq_merge_sorted(grp_members) else: - raise TypeError(("Group config must be list, dict " - " or string types only and not %s") % - type_utils.obj_name(grp_cfg)) + raise TypeError(('Group config must be list, dict or string type only ' + 'but found %s') % (type_utils.obj_name(grp_cfg))) return groups -# Normalizes a input group configuration -# which can be a comma seperated list of -# user names, or a list of string user names -# or a list of dictionaries with components -# that define the user config + 'name' (if -# a 'name' field does not exist then the -# default user is assumed to 'own' that -# configuration. +# Normalizes an input group configuration which can be: a list or a dictionary # -# The output is a dictionary of user -# names => user config which is the standard -# form used in the rest of cloud-init. Note -# the default user will have a special config -# entry 'default' which will be marked as true -# all other users will be marked as false. +# components that define the user config + 'name' (if a 'name' field does not +# exist then the default user is assumed to 'own' that configuration.) +# +# Returns a dictionary of user names => user config which is the standard form +# used in the rest of cloud-init. Note the default user will have a special +# config entry 'default' which will be marked true and all other users will be +# marked false. def _normalize_users(u_cfg, def_user_cfg=None): if isinstance(u_cfg, dict): ad_ucfg = [] - for (k, v) in u_cfg.items(): + for k, v in u_cfg.items(): if isinstance(v, (bool, int, float, str)): if util.is_true(v): ad_ucfg.append(str(k)) @@ -94,8 +76,8 @@ def _normalize_users(u_cfg, def_user_cfg=None): v['name'] = k ad_ucfg.append(v) else: - raise TypeError(("Unmappable user value type %s" - " for key %s") % (type_utils.obj_name(v), k)) + raise TypeError(('Unmappable user value type %s for key %s') + % (type_utils.obj_name(v), k)) u_cfg = ad_ucfg elif isinstance(u_cfg, str): u_cfg = util.uniq_merge_sorted(u_cfg) @@ -107,181 +89,147 @@ def _normalize_users(u_cfg, def_user_cfg=None): if u and u not in users: users[u] = {} elif isinstance(user_config, dict): - if 'name' in user_config: - n = user_config.pop('name') - prev_config = users.get(n) or {} - users[n] = util.mergemanydict([prev_config, - user_config]) - else: - # Assume the default user then - prev_config = users.get('default') or {} - users['default'] = util.mergemanydict([prev_config, - user_config]) + n = user_config.pop('name', 'default') + prev_config = users.get(n) or {} + users[n] = util.mergemanydict([prev_config, user_config]) else: - raise TypeError(("User config must be dictionary/list " - " or string types only and not %s") % - type_utils.obj_name(user_config)) + raise TypeError(('User config must be dictionary/list or string ' + ' types only and not %s') + % (type_utils.obj_name(user_config))) # Ensure user options are in the right python friendly format if users: c_users = {} - for (uname, uconfig) in users.items(): + for uname, uconfig in users.items(): c_uconfig = {} - for (k, v) in uconfig.items(): + for k, v in uconfig.items(): k = k.replace('-', '_').strip() if k: c_uconfig[k] = v c_users[uname] = c_uconfig users = c_users - # Fixup the default user into the real - # default user name and replace it... + # Fix the default user into the actual default user name and replace it. def_user = None if users and 'default' in users: def_config = users.pop('default') if def_user_cfg: - # Pickup what the default 'real name' is - # and any groups that are provided by the - # default config + # Pickup what the default 'real name' is and any groups that are + # provided by the default config def_user_cfg = def_user_cfg.copy() def_user = def_user_cfg.pop('name') def_groups = def_user_cfg.pop('groups', []) - # Pickup any config + groups for that user name - # that we may have previously extracted + # Pick any config + groups for the user name that we may have + # extracted previously parsed_config = users.pop(def_user, {}) parsed_groups = parsed_config.get('groups', []) - # Now merge our extracted groups with - # anything the default config provided + # Now merge the extracted groups with the default config provided users_groups = util.uniq_merge_sorted(parsed_groups, def_groups) - parsed_config['groups'] = ",".join(users_groups) - # The real config for the default user is the - # combination of the default user config provided - # by the distro, the default user config provided - # by the above merging for the user 'default' and - # then the parsed config from the user's 'real name' - # which does not have to be 'default' (but could be) - users[def_user] = util.mergemanydict([def_user_cfg, - def_config, + parsed_config['groups'] = ','.join(users_groups) + # The real config for the default user is the combination of the + # default user config provided by the distro, the default user + # config provided by the above merging for the user 'default' and + # then the parsed config from the user's 'real name' which does not + # have to be 'default' (but could be) + users[def_user] = util.mergemanydict([def_user_cfg, def_config, parsed_config]) - # Ensure that only the default user that we - # found (if any) is actually marked as being - # the default user - if users: - for (uname, uconfig) in users.items(): - if def_user and uname == def_user: - uconfig['default'] = True - else: - uconfig['default'] = False + # Ensure that only the default user that we found (if any) is actually + # marked as the default user + for uname, uconfig in users.items(): + uconfig['default'] = (uname == def_user if def_user else False) return users -# Normalizes a set of user/users and group -# dictionary configuration into a useable -# format that the rest of cloud-init can -# understand using the default user -# provided by the input distrobution (if any) -# to allow for mapping of the 'default' user. +# Normalizes a set of user/users and group dictionary configuration into an +# usable format so that the rest of cloud-init can understand using the default +# user provided by the input distribution (if any) to allow mapping of the +# 'default' user. # # Output is a dictionary of group names -> [member] (list) # and a dictionary of user names -> user configuration (dict) # -# If 'user' exists it will override -# the 'users'[0] entry (if a list) otherwise it will -# just become an entry in the returned dictionary (no override) +# If 'user' exists, it will override +# The 'users'[0] entry (if a list) otherwise it will just become an entry in +# the returned dictionary (no override) def normalize_users_groups(cfg, distro): if not cfg: cfg = {} - users = {} - groups = {} - if 'groups' in cfg: - groups = _normalize_groups(cfg['groups']) - # Handle the previous style of doing this where the first user # overrides the concept of the default user if provided in the user: XYZ # format. old_user = {} if 'user' in cfg and cfg['user']: old_user = cfg['user'] - # Translate it into the format that is more useful - # going forward + # Translate it into a format that will be more useful going forward if isinstance(old_user, str): - old_user = { - 'name': old_user, - } - if not isinstance(old_user, dict): + old_user = {'name': old_user} + elif not isinstance(old_user, dict): LOG.warning(("Format for 'user' key must be a string or dictionary" " and not %s"), type_utils.obj_name(old_user)) old_user = {} - # If no old user format, then assume the distro - # provides what the 'default' user maps to, but notice - # that if this is provided, we won't automatically inject - # a 'default' user into the users list, while if a old user - # format is provided we will. + # If no old user format, then assume the distro provides what the 'default' + # user maps to, but notice that if this is provided, we won't automatically + # inject a 'default' user into the users list, while if an old user format + # is provided we will. distro_user_config = {} try: distro_user_config = distro.get_default_user() except NotImplementedError: - LOG.warning(("Distro has not implemented default user " - "access. No distribution provided default user" - " will be normalized.")) + LOG.warning(('Distro has not implemented default user access. No ' + 'distribution provided default user will be normalized.')) - # Merge the old user (which may just be an empty dict when not - # present with the distro provided default user configuration so - # that the old user style picks up all the distribution specific - # attributes (if any) + # Merge the old user (which may just be an empty dict when not present) + # with the distro provided default user configuration so that the old user + # style picks up all the distribution specific attributes (if any) default_user_config = util.mergemanydict([old_user, distro_user_config]) base_users = cfg.get('users', []) if not isinstance(base_users, (list, dict, str)): LOG.warning(("Format for 'users' key must be a comma separated string" - " or a dictionary or a list and not %s"), + " or a dictionary or a list but found %s"), type_utils.obj_name(base_users)) base_users = [] if old_user: - # Ensure that when user: is provided that this user - # always gets added (as the default user) + # When 'user:' is provided, it should be made as the default user if isinstance(base_users, list): - # Just add it on at the end... base_users.append({'name': 'default'}) elif isinstance(base_users, dict): base_users['default'] = dict(base_users).get('default', True) elif isinstance(base_users, str): - # Just append it on to be re-parsed later - base_users += ",default" + base_users += ',default' + + groups = {} + if 'groups' in cfg: + groups = _normalize_groups(cfg['groups']) users = _normalize_users(base_users, default_user_config) return (users, groups) -# Given a user dictionary config it will -# extract the default user name and user config -# from that list and return that tuple or -# return (None, None) if no default user is -# found in the given input +# Given a user dictionary config, extract the default user name and user config +# and return them or return (None, None) if no default user is found def extract_default(users, default_name=None, default_config=None): if not users: - users = {} + return (default_name, default_config) def safe_find(entry): config = entry[1] if not config or 'default' not in config: return False - else: - return config['default'] + return config['default'] - tmp_users = users.items() - tmp_users = dict(filter(safe_find, tmp_users)) + tmp_users = dict(filter(safe_find, users.items())) if not tmp_users: return (default_name, default_config) - else: - name = list(tmp_users)[0] - config = tmp_users[name] - config.pop('default', None) - return (name, config) + + name = list(tmp_users)[0] + config = tmp_users[name] + config.pop('default', None) + return (name, config) # vi: ts=4 expandtab -- cgit v1.2.3