Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 120 additions & 13 deletions cloudinit/ssh_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,113 @@ def render_authorizedkeysfile_paths(value, homedir, username):
return rendered


# Inspired from safe_path() in openssh source code (misc.c).
def check_permissions(username, current_path, full_path, is_file, strictmodes):
"""Check if the file/folder in @current_path has the right permissions.

We need to check that:
1. If StrictMode is enabled, the owner is either root or the user
2. the user can access the file/folder, otherwise ssh won't use it
3. If StrictMode is enabled, no write permission is given to group
and world users (022)
"""

# group/world can only execute the folder (access)
minimal_permissions = 0o711
if is_file:
# group/world can only read the file
minimal_permissions = 0o644

# 1. owner must be either root or the user itself
owner = util.get_owner(current_path)
if strictmodes and owner != username and owner != "root":
LOG.debug("Path %s in %s must be own by user %s or"
" by root, but instead is own by %s. Ignoring key.",
current_path, full_path, username, owner)
return False

parent_permission = util.get_permissions(current_path)
# 2. the user can access the file/folder, otherwise ssh won't use it
if owner == username:
# need only the owner permissions
minimal_permissions &= 0o700
else:
group_owner = util.get_group(current_path)
user_groups = util.get_user_groups(username)

if group_owner in user_groups:
# need only the group permissions
minimal_permissions &= 0o070
else:
# need only the world permissions
minimal_permissions &= 0o007

if parent_permission & minimal_permissions == 0:
LOG.debug("Path %s in %s must be accessible by user %s,"
" check its permissions",
current_path, full_path, username)
return False

# 3. no write permission (w) is given to group and world users (022)
# Group and world user can still have +rx.
if strictmodes and parent_permission & 0o022 != 0:
LOG.debug("Path %s in %s must not give write"
"permission to group or world users. Ignoring key.",
current_path, full_path)
return False

return True


def check_create_path(username, filename, strictmodes):
user_pwent = users_ssh_info(username)[1]
root_pwent = users_ssh_info("root")[1]
try:
# check the directories first
directories = filename.split("/")[1:-1]

# scan in order, from root to file name
parent_folder = ""
# this is to comply also with unit tests, and
# strange home directories
home_folder = os.path.dirname(user_pwent.pw_dir)
for directory in directories:
parent_folder += "/" + directory
if home_folder.startswith(parent_folder):
continue

if not os.path.isdir(parent_folder):
# directory does not exist, and permission so far are good:
# create the directory, and make it accessible by everyone
# but owned by root, as it might be used by many users.
with util.SeLinuxGuard(parent_folder):
os.makedirs(parent_folder, mode=0o755, exist_ok=True)
util.chownbyid(parent_folder, root_pwent.pw_uid,
root_pwent.pw_gid)

permissions = check_permissions(username, parent_folder,
filename, False, strictmodes)
if not permissions:
return False

# check the file
if not os.path.exists(filename):
# if file does not exist: we need to create it, since the
# folders at this point exist and have right permissions
util.write_file(filename, '', mode=0o600, ensure_dir_exists=True)
util.chownbyid(filename, user_pwent.pw_uid, user_pwent.pw_gid)

permissions = check_permissions(username, filename,
filename, True, strictmodes)
if not permissions:
return False
except (IOError, OSError) as e:
util.logexc(LOG, str(e))
return False

return True


def extract_authorized_keys(username, sshd_cfg_file=DEF_SSHD_CFG):
(ssh_dir, pw_ent) = users_ssh_info(username)
default_authorizedkeys_file = os.path.join(ssh_dir, 'authorized_keys')
Expand All @@ -259,6 +366,7 @@ def extract_authorized_keys(username, sshd_cfg_file=DEF_SSHD_CFG):
ssh_cfg = parse_ssh_config_map(sshd_cfg_file)
key_paths = ssh_cfg.get("authorizedkeysfile",
"%h/.ssh/authorized_keys")
strictmodes = ssh_cfg.get("strictmodes", "yes")
auth_key_fns = render_authorizedkeysfile_paths(
key_paths, pw_ent.pw_dir, username)

Expand All @@ -269,31 +377,31 @@ def extract_authorized_keys(username, sshd_cfg_file=DEF_SSHD_CFG):
"config from %r, using 'AuthorizedKeysFile' file "
"%r instead", DEF_SSHD_CFG, auth_key_fns[0])

# check if one of the keys is the user's one
# check if one of the keys is the user's one and has the right permissions
for key_path, auth_key_fn in zip(key_paths.split(), auth_key_fns):
if any([
'%u' in key_path,
'%h' in key_path,
auth_key_fn.startswith('{}/'.format(pw_ent.pw_dir))
]):
user_authorizedkeys_file = auth_key_fn
permissions_ok = check_create_path(username, auth_key_fn,
strictmodes == "yes")
if permissions_ok:
user_authorizedkeys_file = auth_key_fn
break

if user_authorizedkeys_file != default_authorizedkeys_file:
LOG.debug(
"AuthorizedKeysFile has an user-specific authorized_keys, "
"using %s", user_authorizedkeys_file)

# always store all the keys in the user's private file
return (user_authorizedkeys_file, parse_authorized_keys(auth_key_fns))
return (
user_authorizedkeys_file,
parse_authorized_keys([user_authorizedkeys_file])
)


def setup_user_keys(keys, username, options=None):
# Make sure the users .ssh dir is setup accordingly
(ssh_dir, pwent) = users_ssh_info(username)
if not os.path.isdir(ssh_dir):
util.ensure_dir(ssh_dir, mode=0o700)
util.chownbyid(ssh_dir, pwent.pw_uid, pwent.pw_gid)

# Turn the 'update' keys given into actual entries
parser = AuthKeyLineParser()
key_entries = []
Expand All @@ -302,11 +410,10 @@ def setup_user_keys(keys, username, options=None):

# Extract the old and make the new
(auth_key_fn, auth_key_entries) = extract_authorized_keys(username)
ssh_dir = os.path.dirname(auth_key_fn)
with util.SeLinuxGuard(ssh_dir, recursive=True):
content = update_authorized_keys(auth_key_entries, key_entries)
util.ensure_dir(os.path.dirname(auth_key_fn), mode=0o700)
util.write_file(auth_key_fn, content, mode=0o600)
util.chownbyid(auth_key_fn, pwent.pw_uid, pwent.pw_gid)
util.write_file(auth_key_fn, content, preserve_mode=True)


class SshdConfigLine(object):
Expand Down
51 changes: 49 additions & 2 deletions cloudinit/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from errno import ENOENT
from functools import lru_cache
from urllib import parse
from typing import List

from cloudinit import importer
from cloudinit import log as logging
Expand Down Expand Up @@ -1878,6 +1879,53 @@ def chmod(path, mode):
os.chmod(path, real_mode)


def get_permissions(path: str) -> int:
"""
Returns the octal permissions of the file/folder pointed by the path,
encoded as an int.

@param path: The full path of the file/folder.
"""

return stat.S_IMODE(os.stat(path).st_mode)


def get_owner(path: str) -> str:
"""
Returns the owner of the file/folder pointed by the path.

@param path: The full path of the file/folder.
"""
st = os.stat(path)
return pwd.getpwuid(st.st_uid).pw_name


def get_group(path: str) -> str:
"""
Returns the group of the file/folder pointed by the path.

@param path: The full path of the file/folder.
"""
st = os.stat(path)
return grp.getgrgid(st.st_gid).gr_name


def get_user_groups(username: str) -> List[str]:
"""
Returns a list of all groups to which the user belongs

@param username: the user we want to check
"""
groups = []
for group in grp.getgrall():
if username in group.gr_mem:
groups.append(group.gr_name)

gid = pwd.getpwnam(username).pw_gid
groups.append(grp.getgrgid(gid).gr_name)
return groups


def write_file(
filename,
content,
Expand All @@ -1904,8 +1952,7 @@ def write_file(

if preserve_mode:
try:
file_stat = os.stat(filename)
mode = stat.S_IMODE(file_stat.st_mode)
mode = get_permissions(filename)
except OSError:
pass

Expand Down
Loading