summaryrefslogtreecommitdiff
path: root/cloudinit/distros
diff options
context:
space:
mode:
authorShreenidhi Shedi <53473811+sshedi@users.noreply.github.com>2021-09-14 01:11:45 +0530
committerGitHub <noreply@github.com>2021-09-13 14:41:45 -0500
commitf3cc94949d9f153b4a5135f8b989ff11b36ab7ea (patch)
treec22cead26514ce3904fe672efc7ab7ca7eb2c963 /cloudinit/distros
parentf4c47e3e25d1fb79e2673e37f8fc67750d025be2 (diff)
downloadvyos-cloud-init-f3cc94949d9f153b4a5135f8b989ff11b36ab7ea.tar.gz
vyos-cloud-init-f3cc94949d9f153b4a5135f8b989ff11b36ab7ea.zip
Improve ug_util.py (#1013)
No functional changes.
Diffstat (limited to 'cloudinit/distros')
-rwxr-xr-xcloudinit/distros/ug_util.py234
1 files changed, 91 insertions, 143 deletions
diff --git a/cloudinit/distros/ug_util.py b/cloudinit/distros/ug_util.py
index 08446a95..600b743f 100755
--- a/cloudinit/distros/ug_util.py
+++ b/cloudinit/distros/ug_util.py
@@ -16,77 +16,59 @@ from cloudinit import util
LOG = logging.getLogger(__name__)
-# Normalizes a input group configuration
-# which can be a comma seperated list of
-# group names, or a list of group names
-# or a python dictionary of group names
-# to a list of members of that group.
+# Normalizes an input group configuration which can be:
+# Comma seperated string or a list or a dictionary
#
-# The output is a dictionary of group
-# names => members of that group which
-# is the standard form used in the rest
-# of cloud-init
+# Returns dictionary of group names => members of that group which is the
+# standard form used in the rest of cloud-init
def _normalize_groups(grp_cfg):
if isinstance(grp_cfg, str):
- grp_cfg = grp_cfg.strip().split(",")
+ grp_cfg = grp_cfg.strip().split(',')
+
if isinstance(grp_cfg, list):
c_grp_cfg = {}
for i in grp_cfg:
if isinstance(i, dict):
for k, v in i.items():
- if k not in c_grp_cfg:
- if isinstance(v, list):
- c_grp_cfg[k] = list(v)
- elif isinstance(v, str):
- c_grp_cfg[k] = [v]
- else:
- raise TypeError("Bad group member type %s" %
- type_utils.obj_name(v))
+ if not isinstance(v, (list, str)):
+ raise TypeError('Bad group member type %s'
+ % (type_utils.obj_name(v)))
+
+ if isinstance(v, list):
+ c_grp_cfg.setdefault(k, []).extend(v)
else:
- if isinstance(v, list):
- c_grp_cfg[k].extend(v)
- elif isinstance(v, str):
- c_grp_cfg[k].append(v)
- else:
- raise TypeError("Bad group member type %s" %
- type_utils.obj_name(v))
+ c_grp_cfg.setdefault(k, []).append(v)
elif isinstance(i, str):
if i not in c_grp_cfg:
c_grp_cfg[i] = []
else:
- raise TypeError("Unknown group name type %s" %
- type_utils.obj_name(i))
+ raise TypeError('Unknown group name type %s'
+ % (type_utils.obj_name(i)))
grp_cfg = c_grp_cfg
+
groups = {}
if isinstance(grp_cfg, dict):
- for (grp_name, grp_members) in grp_cfg.items():
+ for grp_name, grp_members in grp_cfg.items():
groups[grp_name] = util.uniq_merge_sorted(grp_members)
else:
- raise TypeError(("Group config must be list, dict "
- " or string types only and not %s") %
- type_utils.obj_name(grp_cfg))
+ raise TypeError(('Group config must be list, dict or string type only '
+ 'but found %s') % (type_utils.obj_name(grp_cfg)))
return groups
-# Normalizes a input group configuration
-# which can be a comma seperated list of
-# user names, or a list of string user names
-# or a list of dictionaries with components
-# that define the user config + 'name' (if
-# a 'name' field does not exist then the
-# default user is assumed to 'own' that
-# configuration.
+# Normalizes an input group configuration which can be: a list or a dictionary
#
-# The output is a dictionary of user
-# names => user config which is the standard
-# form used in the rest of cloud-init. Note
-# the default user will have a special config
-# entry 'default' which will be marked as true
-# all other users will be marked as false.
+# components that define the user config + 'name' (if a 'name' field does not
+# exist then the default user is assumed to 'own' that configuration.)
+#
+# Returns a dictionary of user names => user config which is the standard form
+# used in the rest of cloud-init. Note the default user will have a special
+# config entry 'default' which will be marked true and all other users will be
+# marked false.
def _normalize_users(u_cfg, def_user_cfg=None):
if isinstance(u_cfg, dict):
ad_ucfg = []
- for (k, v) in u_cfg.items():
+ for k, v in u_cfg.items():
if isinstance(v, (bool, int, float, str)):
if util.is_true(v):
ad_ucfg.append(str(k))
@@ -94,8 +76,8 @@ def _normalize_users(u_cfg, def_user_cfg=None):
v['name'] = k
ad_ucfg.append(v)
else:
- raise TypeError(("Unmappable user value type %s"
- " for key %s") % (type_utils.obj_name(v), k))
+ raise TypeError(('Unmappable user value type %s for key %s')
+ % (type_utils.obj_name(v), k))
u_cfg = ad_ucfg
elif isinstance(u_cfg, str):
u_cfg = util.uniq_merge_sorted(u_cfg)
@@ -107,181 +89,147 @@ def _normalize_users(u_cfg, def_user_cfg=None):
if u and u not in users:
users[u] = {}
elif isinstance(user_config, dict):
- if 'name' in user_config:
- n = user_config.pop('name')
- prev_config = users.get(n) or {}
- users[n] = util.mergemanydict([prev_config,
- user_config])
- else:
- # Assume the default user then
- prev_config = users.get('default') or {}
- users['default'] = util.mergemanydict([prev_config,
- user_config])
+ n = user_config.pop('name', 'default')
+ prev_config = users.get(n) or {}
+ users[n] = util.mergemanydict([prev_config, user_config])
else:
- raise TypeError(("User config must be dictionary/list "
- " or string types only and not %s") %
- type_utils.obj_name(user_config))
+ raise TypeError(('User config must be dictionary/list or string '
+ ' types only and not %s')
+ % (type_utils.obj_name(user_config)))
# Ensure user options are in the right python friendly format
if users:
c_users = {}
- for (uname, uconfig) in users.items():
+ for uname, uconfig in users.items():
c_uconfig = {}
- for (k, v) in uconfig.items():
+ for k, v in uconfig.items():
k = k.replace('-', '_').strip()
if k:
c_uconfig[k] = v
c_users[uname] = c_uconfig
users = c_users
- # Fixup the default user into the real
- # default user name and replace it...
+ # Fix the default user into the actual default user name and replace it.
def_user = None
if users and 'default' in users:
def_config = users.pop('default')
if def_user_cfg:
- # Pickup what the default 'real name' is
- # and any groups that are provided by the
- # default config
+ # Pickup what the default 'real name' is and any groups that are
+ # provided by the default config
def_user_cfg = def_user_cfg.copy()
def_user = def_user_cfg.pop('name')
def_groups = def_user_cfg.pop('groups', [])
- # Pickup any config + groups for that user name
- # that we may have previously extracted
+ # Pick any config + groups for the user name that we may have
+ # extracted previously
parsed_config = users.pop(def_user, {})
parsed_groups = parsed_config.get('groups', [])
- # Now merge our extracted groups with
- # anything the default config provided
+ # Now merge the extracted groups with the default config provided
users_groups = util.uniq_merge_sorted(parsed_groups, def_groups)
- parsed_config['groups'] = ",".join(users_groups)
- # The real config for the default user is the
- # combination of the default user config provided
- # by the distro, the default user config provided
- # by the above merging for the user 'default' and
- # then the parsed config from the user's 'real name'
- # which does not have to be 'default' (but could be)
- users[def_user] = util.mergemanydict([def_user_cfg,
- def_config,
+ parsed_config['groups'] = ','.join(users_groups)
+ # The real config for the default user is the combination of the
+ # default user config provided by the distro, the default user
+ # config provided by the above merging for the user 'default' and
+ # then the parsed config from the user's 'real name' which does not
+ # have to be 'default' (but could be)
+ users[def_user] = util.mergemanydict([def_user_cfg, def_config,
parsed_config])
- # Ensure that only the default user that we
- # found (if any) is actually marked as being
- # the default user
- if users:
- for (uname, uconfig) in users.items():
- if def_user and uname == def_user:
- uconfig['default'] = True
- else:
- uconfig['default'] = False
+ # Ensure that only the default user that we found (if any) is actually
+ # marked as the default user
+ for uname, uconfig in users.items():
+ uconfig['default'] = (uname == def_user if def_user else False)
return users
-# Normalizes a set of user/users and group
-# dictionary configuration into a useable
-# format that the rest of cloud-init can
-# understand using the default user
-# provided by the input distrobution (if any)
-# to allow for mapping of the 'default' user.
+# Normalizes a set of user/users and group dictionary configuration into an
+# usable format so that the rest of cloud-init can understand using the default
+# user provided by the input distribution (if any) to allow mapping of the
+# 'default' user.
#
# Output is a dictionary of group names -> [member] (list)
# and a dictionary of user names -> user configuration (dict)
#
-# If 'user' exists it will override
-# the 'users'[0] entry (if a list) otherwise it will
-# just become an entry in the returned dictionary (no override)
+# If 'user' exists, it will override
+# The 'users'[0] entry (if a list) otherwise it will just become an entry in
+# the returned dictionary (no override)
def normalize_users_groups(cfg, distro):
if not cfg:
cfg = {}
- users = {}
- groups = {}
- if 'groups' in cfg:
- groups = _normalize_groups(cfg['groups'])
-
# Handle the previous style of doing this where the first user
# overrides the concept of the default user if provided in the user: XYZ
# format.
old_user = {}
if 'user' in cfg and cfg['user']:
old_user = cfg['user']
- # Translate it into the format that is more useful
- # going forward
+ # Translate it into a format that will be more useful going forward
if isinstance(old_user, str):
- old_user = {
- 'name': old_user,
- }
- if not isinstance(old_user, dict):
+ old_user = {'name': old_user}
+ elif not isinstance(old_user, dict):
LOG.warning(("Format for 'user' key must be a string or dictionary"
" and not %s"), type_utils.obj_name(old_user))
old_user = {}
- # If no old user format, then assume the distro
- # provides what the 'default' user maps to, but notice
- # that if this is provided, we won't automatically inject
- # a 'default' user into the users list, while if a old user
- # format is provided we will.
+ # If no old user format, then assume the distro provides what the 'default'
+ # user maps to, but notice that if this is provided, we won't automatically
+ # inject a 'default' user into the users list, while if an old user format
+ # is provided we will.
distro_user_config = {}
try:
distro_user_config = distro.get_default_user()
except NotImplementedError:
- LOG.warning(("Distro has not implemented default user "
- "access. No distribution provided default user"
- " will be normalized."))
+ LOG.warning(('Distro has not implemented default user access. No '
+ 'distribution provided default user will be normalized.'))
- # Merge the old user (which may just be an empty dict when not
- # present with the distro provided default user configuration so
- # that the old user style picks up all the distribution specific
- # attributes (if any)
+ # Merge the old user (which may just be an empty dict when not present)
+ # with the distro provided default user configuration so that the old user
+ # style picks up all the distribution specific attributes (if any)
default_user_config = util.mergemanydict([old_user, distro_user_config])
base_users = cfg.get('users', [])
if not isinstance(base_users, (list, dict, str)):
LOG.warning(("Format for 'users' key must be a comma separated string"
- " or a dictionary or a list and not %s"),
+ " or a dictionary or a list but found %s"),
type_utils.obj_name(base_users))
base_users = []
if old_user:
- # Ensure that when user: is provided that this user
- # always gets added (as the default user)
+ # When 'user:' is provided, it should be made as the default user
if isinstance(base_users, list):
- # Just add it on at the end...
base_users.append({'name': 'default'})
elif isinstance(base_users, dict):
base_users['default'] = dict(base_users).get('default', True)
elif isinstance(base_users, str):
- # Just append it on to be re-parsed later
- base_users += ",default"
+ base_users += ',default'
+
+ groups = {}
+ if 'groups' in cfg:
+ groups = _normalize_groups(cfg['groups'])
users = _normalize_users(base_users, default_user_config)
return (users, groups)
-# Given a user dictionary config it will
-# extract the default user name and user config
-# from that list and return that tuple or
-# return (None, None) if no default user is
-# found in the given input
+# Given a user dictionary config, extract the default user name and user config
+# and return them or return (None, None) if no default user is found
def extract_default(users, default_name=None, default_config=None):
if not users:
- users = {}
+ return (default_name, default_config)
def safe_find(entry):
config = entry[1]
if not config or 'default' not in config:
return False
- else:
- return config['default']
+ return config['default']
- tmp_users = users.items()
- tmp_users = dict(filter(safe_find, tmp_users))
+ tmp_users = dict(filter(safe_find, users.items()))
if not tmp_users:
return (default_name, default_config)
- else:
- name = list(tmp_users)[0]
- config = tmp_users[name]
- config.pop('default', None)
- return (name, config)
+
+ name = list(tmp_users)[0]
+ config = tmp_users[name]
+ config.pop('default', None)
+ return (name, config)
# vi: ts=4 expandtab