summaryrefslogtreecommitdiff
path: root/cloudinit
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit')
-rw-r--r--cloudinit/ssh_util.py133
-rw-r--r--cloudinit/util.py51
2 files changed, 169 insertions, 15 deletions
diff --git a/cloudinit/ssh_util.py b/cloudinit/ssh_util.py
index 89057262..b8a3c8f7 100644
--- a/cloudinit/ssh_util.py
+++ b/cloudinit/ssh_util.py
@@ -249,6 +249,113 @@ def render_authorizedkeysfile_paths(value, homedir, username):
return rendered
+# Inspired from safe_path() in openssh source code (misc.c).
+def check_permissions(username, current_path, full_path, is_file, strictmodes):
+ """Check if the file/folder in @current_path has the right permissions.
+
+ We need to check that:
+ 1. If StrictMode is enabled, the owner is either root or the user
+ 2. the user can access the file/folder, otherwise ssh won't use it
+ 3. If StrictMode is enabled, no write permission is given to group
+ and world users (022)
+ """
+
+ # group/world can only execute the folder (access)
+ minimal_permissions = 0o711
+ if is_file:
+ # group/world can only read the file
+ minimal_permissions = 0o644
+
+ # 1. owner must be either root or the user itself
+ owner = util.get_owner(current_path)
+ if strictmodes and owner != username and owner != "root":
+ LOG.debug("Path %s in %s must be own by user %s or"
+ " by root, but instead is own by %s. Ignoring key.",
+ current_path, full_path, username, owner)
+ return False
+
+ parent_permission = util.get_permissions(current_path)
+ # 2. the user can access the file/folder, otherwise ssh won't use it
+ if owner == username:
+ # need only the owner permissions
+ minimal_permissions &= 0o700
+ else:
+ group_owner = util.get_group(current_path)
+ user_groups = util.get_user_groups(username)
+
+ if group_owner in user_groups:
+ # need only the group permissions
+ minimal_permissions &= 0o070
+ else:
+ # need only the world permissions
+ minimal_permissions &= 0o007
+
+ if parent_permission & minimal_permissions == 0:
+ LOG.debug("Path %s in %s must be accessible by user %s,"
+ " check its permissions",
+ current_path, full_path, username)
+ return False
+
+ # 3. no write permission (w) is given to group and world users (022)
+ # Group and world user can still have +rx.
+ if strictmodes and parent_permission & 0o022 != 0:
+ LOG.debug("Path %s in %s must not give write"
+ "permission to group or world users. Ignoring key.",
+ current_path, full_path)
+ return False
+
+ return True
+
+
+def check_create_path(username, filename, strictmodes):
+ user_pwent = users_ssh_info(username)[1]
+ root_pwent = users_ssh_info("root")[1]
+ try:
+ # check the directories first
+ directories = filename.split("/")[1:-1]
+
+ # scan in order, from root to file name
+ parent_folder = ""
+ # this is to comply also with unit tests, and
+ # strange home directories
+ home_folder = os.path.dirname(user_pwent.pw_dir)
+ for directory in directories:
+ parent_folder += "/" + directory
+ if home_folder.startswith(parent_folder):
+ continue
+
+ if not os.path.isdir(parent_folder):
+ # directory does not exist, and permission so far are good:
+ # create the directory, and make it accessible by everyone
+ # but owned by root, as it might be used by many users.
+ with util.SeLinuxGuard(parent_folder):
+ os.makedirs(parent_folder, mode=0o755, exist_ok=True)
+ util.chownbyid(parent_folder, root_pwent.pw_uid,
+ root_pwent.pw_gid)
+
+ permissions = check_permissions(username, parent_folder,
+ filename, False, strictmodes)
+ if not permissions:
+ return False
+
+ # check the file
+ if not os.path.exists(filename):
+ # if file does not exist: we need to create it, since the
+ # folders at this point exist and have right permissions
+ util.write_file(filename, '', mode=0o600, ensure_dir_exists=True)
+ util.chownbyid(filename, user_pwent.pw_uid, user_pwent.pw_gid)
+
+ permissions = check_permissions(username, filename,
+ filename, True, strictmodes)
+ if not permissions:
+ return False
+ except (IOError, OSError) as e:
+ util.logexc(LOG, str(e))
+ return False
+
+ return True
+
+
def extract_authorized_keys(username, sshd_cfg_file=DEF_SSHD_CFG):
(ssh_dir, pw_ent) = users_ssh_info(username)
default_authorizedkeys_file = os.path.join(ssh_dir, 'authorized_keys')
@@ -259,6 +366,7 @@ def extract_authorized_keys(username, sshd_cfg_file=DEF_SSHD_CFG):
ssh_cfg = parse_ssh_config_map(sshd_cfg_file)
key_paths = ssh_cfg.get("authorizedkeysfile",
"%h/.ssh/authorized_keys")
+ strictmodes = ssh_cfg.get("strictmodes", "yes")
auth_key_fns = render_authorizedkeysfile_paths(
key_paths, pw_ent.pw_dir, username)
@@ -269,31 +377,31 @@ def extract_authorized_keys(username, sshd_cfg_file=DEF_SSHD_CFG):
"config from %r, using 'AuthorizedKeysFile' file "
"%r instead", DEF_SSHD_CFG, auth_key_fns[0])
- # check if one of the keys is the user's one
+ # check if one of the keys is the user's one and has the right permissions
for key_path, auth_key_fn in zip(key_paths.split(), auth_key_fns):
if any([
'%u' in key_path,
'%h' in key_path,
auth_key_fn.startswith('{}/'.format(pw_ent.pw_dir))
]):
- user_authorizedkeys_file = auth_key_fn
+ permissions_ok = check_create_path(username, auth_key_fn,
+ strictmodes == "yes")
+ if permissions_ok:
+ user_authorizedkeys_file = auth_key_fn
+ break
if user_authorizedkeys_file != default_authorizedkeys_file:
LOG.debug(
"AuthorizedKeysFile has an user-specific authorized_keys, "
"using %s", user_authorizedkeys_file)
- # always store all the keys in the user's private file
- return (user_authorizedkeys_file, parse_authorized_keys(auth_key_fns))
+ return (
+ user_authorizedkeys_file,
+ parse_authorized_keys([user_authorizedkeys_file])
+ )
def setup_user_keys(keys, username, options=None):
- # Make sure the users .ssh dir is setup accordingly
- (ssh_dir, pwent) = users_ssh_info(username)
- if not os.path.isdir(ssh_dir):
- util.ensure_dir(ssh_dir, mode=0o700)
- util.chownbyid(ssh_dir, pwent.pw_uid, pwent.pw_gid)
-
# Turn the 'update' keys given into actual entries
parser = AuthKeyLineParser()
key_entries = []
@@ -302,11 +410,10 @@ def setup_user_keys(keys, username, options=None):
# Extract the old and make the new
(auth_key_fn, auth_key_entries) = extract_authorized_keys(username)
+ ssh_dir = os.path.dirname(auth_key_fn)
with util.SeLinuxGuard(ssh_dir, recursive=True):
content = update_authorized_keys(auth_key_entries, key_entries)
- util.ensure_dir(os.path.dirname(auth_key_fn), mode=0o700)
- util.write_file(auth_key_fn, content, mode=0o600)
- util.chownbyid(auth_key_fn, pwent.pw_uid, pwent.pw_gid)
+ util.write_file(auth_key_fn, content, preserve_mode=True)
class SshdConfigLine(object):
diff --git a/cloudinit/util.py b/cloudinit/util.py
index d3ced463..c53f6453 100644
--- a/cloudinit/util.py
+++ b/cloudinit/util.py
@@ -35,6 +35,7 @@ from base64 import b64decode, b64encode
from errno import ENOENT
from functools import lru_cache
from urllib import parse
+from typing import List
from cloudinit import importer
from cloudinit import log as logging
@@ -1878,6 +1879,53 @@ def chmod(path, mode):
os.chmod(path, real_mode)
+def get_permissions(path: str) -> int:
+ """
+ Returns the octal permissions of the file/folder pointed by the path,
+ encoded as an int.
+
+ @param path: The full path of the file/folder.
+ """
+
+ return stat.S_IMODE(os.stat(path).st_mode)
+
+
+def get_owner(path: str) -> str:
+ """
+ Returns the owner of the file/folder pointed by the path.
+
+ @param path: The full path of the file/folder.
+ """
+ st = os.stat(path)
+ return pwd.getpwuid(st.st_uid).pw_name
+
+
+def get_group(path: str) -> str:
+ """
+ Returns the group of the file/folder pointed by the path.
+
+ @param path: The full path of the file/folder.
+ """
+ st = os.stat(path)
+ return grp.getgrgid(st.st_gid).gr_name
+
+
+def get_user_groups(username: str) -> List[str]:
+ """
+ Returns a list of all groups to which the user belongs
+
+ @param username: the user we want to check
+ """
+ groups = []
+ for group in grp.getgrall():
+ if username in group.gr_mem:
+ groups.append(group.gr_name)
+
+ gid = pwd.getpwnam(username).pw_gid
+ groups.append(grp.getgrgid(gid).gr_name)
+ return groups
+
+
def write_file(
filename,
content,
@@ -1904,8 +1952,7 @@ def write_file(
if preserve_mode:
try:
- file_stat = os.stat(filename)
- mode = stat.S_IMODE(file_stat.st_mode)
+ mode = get_permissions(filename)
except OSError:
pass