From 60ca0a0eca6f027e5bc41a60c18535b51498c62e Mon Sep 17 00:00:00 2001 From: James Falcon Date: Mon, 19 Jul 2021 10:01:09 -0500 Subject: [PATCH 1/3] Stop storing all keys in user's authorized keys file --- cloudinit/ssh_util.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cloudinit/ssh_util.py b/cloudinit/ssh_util.py index 89057262b73..56fef3d6ac4 100644 --- a/cloudinit/ssh_util.py +++ b/cloudinit/ssh_util.py @@ -283,8 +283,10 @@ def extract_authorized_keys(username, sshd_cfg_file=DEF_SSHD_CFG): "AuthorizedKeysFile has an user-specific authorized_keys, " "using %s", user_authorizedkeys_file) - # always store all the keys in the user's private file - return (user_authorizedkeys_file, parse_authorized_keys(auth_key_fns)) + return ( + user_authorizedkeys_file, + parse_authorized_keys([user_authorizedkeys_file]) + ) def setup_user_keys(keys, username, options=None): From 9feec0ca56a2178f987b59ac1360fc048ae0142f Mon Sep 17 00:00:00 2001 From: Emanuele Giuseppe Esposito Date: Wed, 4 Aug 2021 21:46:22 +0200 Subject: [PATCH 2/3] Consider folder permission when chosing user_authorizedkeys_file in extract_authorized_keys In /etc/ssh/sshd_config, it is possible to define a custom authorized_keys file that will contain the keys allowed to access the machine via the AuthorizedKeysFile option. Cloudinit is able to add user-specific keys to the existing ones, but we need to be careful on which of the authorized_keys files listed to pick. Chosing for example a file that is shared by all user will cause security issues, because the owner of that key can then access also other users. We therefore pick an authorized_keys file only if it satisfies the following conditions: 1. it is not a "global" file, ie it must be defined in AuthorizedKeysFile with %u, %h or be in /home/. This avoids security issues. 2. it must comply with ssh permission requirements, otherwise the ssh agent won't use that file. We also need to consider the case when the chosen authorized_keys file does not exist. In this case, the existing behavior of cloud-init is to create the new file. We therefore need to be sure that the file complies with ssh permissions too, by setting: - the actual file to permission 600, and owned by the user - the directories in the path that do not exist must be root owned and with permission 755. Signed-off-by: Emanuele Giuseppe Esposito --- cloudinit/ssh_util.py | 127 ++++++++++++++++++++++++++++++++++++++---- cloudinit/util.py | 51 ++++++++++++++++- 2 files changed, 165 insertions(+), 13 deletions(-) diff --git a/cloudinit/ssh_util.py b/cloudinit/ssh_util.py index 56fef3d6ac4..b8a3c8f75ae 100644 --- a/cloudinit/ssh_util.py +++ b/cloudinit/ssh_util.py @@ -249,6 +249,113 @@ def render_authorizedkeysfile_paths(value, homedir, username): return rendered +# Inspired from safe_path() in openssh source code (misc.c). +def check_permissions(username, current_path, full_path, is_file, strictmodes): + """Check if the file/folder in @current_path has the right permissions. + + We need to check that: + 1. If StrictMode is enabled, the owner is either root or the user + 2. the user can access the file/folder, otherwise ssh won't use it + 3. If StrictMode is enabled, no write permission is given to group + and world users (022) + """ + + # group/world can only execute the folder (access) + minimal_permissions = 0o711 + if is_file: + # group/world can only read the file + minimal_permissions = 0o644 + + # 1. owner must be either root or the user itself + owner = util.get_owner(current_path) + if strictmodes and owner != username and owner != "root": + LOG.debug("Path %s in %s must be own by user %s or" + " by root, but instead is own by %s. Ignoring key.", + current_path, full_path, username, owner) + return False + + parent_permission = util.get_permissions(current_path) + # 2. the user can access the file/folder, otherwise ssh won't use it + if owner == username: + # need only the owner permissions + minimal_permissions &= 0o700 + else: + group_owner = util.get_group(current_path) + user_groups = util.get_user_groups(username) + + if group_owner in user_groups: + # need only the group permissions + minimal_permissions &= 0o070 + else: + # need only the world permissions + minimal_permissions &= 0o007 + + if parent_permission & minimal_permissions == 0: + LOG.debug("Path %s in %s must be accessible by user %s," + " check its permissions", + current_path, full_path, username) + return False + + # 3. no write permission (w) is given to group and world users (022) + # Group and world user can still have +rx. + if strictmodes and parent_permission & 0o022 != 0: + LOG.debug("Path %s in %s must not give write" + "permission to group or world users. Ignoring key.", + current_path, full_path) + return False + + return True + + +def check_create_path(username, filename, strictmodes): + user_pwent = users_ssh_info(username)[1] + root_pwent = users_ssh_info("root")[1] + try: + # check the directories first + directories = filename.split("/")[1:-1] + + # scan in order, from root to file name + parent_folder = "" + # this is to comply also with unit tests, and + # strange home directories + home_folder = os.path.dirname(user_pwent.pw_dir) + for directory in directories: + parent_folder += "/" + directory + if home_folder.startswith(parent_folder): + continue + + if not os.path.isdir(parent_folder): + # directory does not exist, and permission so far are good: + # create the directory, and make it accessible by everyone + # but owned by root, as it might be used by many users. + with util.SeLinuxGuard(parent_folder): + os.makedirs(parent_folder, mode=0o755, exist_ok=True) + util.chownbyid(parent_folder, root_pwent.pw_uid, + root_pwent.pw_gid) + + permissions = check_permissions(username, parent_folder, + filename, False, strictmodes) + if not permissions: + return False + + # check the file + if not os.path.exists(filename): + # if file does not exist: we need to create it, since the + # folders at this point exist and have right permissions + util.write_file(filename, '', mode=0o600, ensure_dir_exists=True) + util.chownbyid(filename, user_pwent.pw_uid, user_pwent.pw_gid) + + permissions = check_permissions(username, filename, + filename, True, strictmodes) + if not permissions: + return False + except (IOError, OSError) as e: + util.logexc(LOG, str(e)) + return False + + return True + + def extract_authorized_keys(username, sshd_cfg_file=DEF_SSHD_CFG): (ssh_dir, pw_ent) = users_ssh_info(username) default_authorizedkeys_file = os.path.join(ssh_dir, 'authorized_keys') @@ -259,6 +366,7 @@ def extract_authorized_keys(username, sshd_cfg_file=DEF_SSHD_CFG): ssh_cfg = parse_ssh_config_map(sshd_cfg_file) key_paths = ssh_cfg.get("authorizedkeysfile", "%h/.ssh/authorized_keys") + strictmodes = ssh_cfg.get("strictmodes", "yes") auth_key_fns = render_authorizedkeysfile_paths( key_paths, pw_ent.pw_dir, username) @@ -269,14 +377,18 @@ def extract_authorized_keys(username, sshd_cfg_file=DEF_SSHD_CFG): "config from %r, using 'AuthorizedKeysFile' file " "%r instead", DEF_SSHD_CFG, auth_key_fns[0]) - # check if one of the keys is the user's one + # check if one of the keys is the user's one and has the right permissions for key_path, auth_key_fn in zip(key_paths.split(), auth_key_fns): if any([ '%u' in key_path, '%h' in key_path, auth_key_fn.startswith('{}/'.format(pw_ent.pw_dir)) ]): - user_authorizedkeys_file = auth_key_fn + permissions_ok = check_create_path(username, auth_key_fn, + strictmodes == "yes") + if permissions_ok: + user_authorizedkeys_file = auth_key_fn + break if user_authorizedkeys_file != default_authorizedkeys_file: LOG.debug( @@ -290,12 +402,6 @@ def extract_authorized_keys(username, sshd_cfg_file=DEF_SSHD_CFG): def setup_user_keys(keys, username, options=None): - # Make sure the users .ssh dir is setup accordingly - (ssh_dir, pwent) = users_ssh_info(username) - if not os.path.isdir(ssh_dir): - util.ensure_dir(ssh_dir, mode=0o700) - util.chownbyid(ssh_dir, pwent.pw_uid, pwent.pw_gid) - # Turn the 'update' keys given into actual entries parser = AuthKeyLineParser() key_entries = [] @@ -304,11 +410,10 @@ def setup_user_keys(keys, username, options=None): # Extract the old and make the new (auth_key_fn, auth_key_entries) = extract_authorized_keys(username) + ssh_dir = os.path.dirname(auth_key_fn) with util.SeLinuxGuard(ssh_dir, recursive=True): content = update_authorized_keys(auth_key_entries, key_entries) - util.ensure_dir(os.path.dirname(auth_key_fn), mode=0o700) - util.write_file(auth_key_fn, content, mode=0o600) - util.chownbyid(auth_key_fn, pwent.pw_uid, pwent.pw_gid) + util.write_file(auth_key_fn, content, preserve_mode=True) class SshdConfigLine(object): diff --git a/cloudinit/util.py b/cloudinit/util.py index 3bed1aed607..cfb0a3c3cd7 100644 --- a/cloudinit/util.py +++ b/cloudinit/util.py @@ -35,6 +35,7 @@ from errno import ENOENT from functools import lru_cache from urllib import parse +from typing import List from cloudinit import importer from cloudinit import log as logging @@ -1878,6 +1879,53 @@ def chmod(path, mode): os.chmod(path, real_mode) +def get_permissions(path: str) -> int: + """ + Returns the octal permissions of the file/folder pointed by the path, + encoded as an int. + + @param path: The full path of the file/folder. + """ + + return stat.S_IMODE(os.stat(path).st_mode) + + +def get_owner(path: str) -> str: + """ + Returns the owner of the file/folder pointed by the path. + + @param path: The full path of the file/folder. + """ + st = os.stat(path) + return pwd.getpwuid(st.st_uid).pw_name + + +def get_group(path: str) -> str: + """ + Returns the group of the file/folder pointed by the path. + + @param path: The full path of the file/folder. + """ + st = os.stat(path) + return grp.getgrgid(st.st_gid).gr_name + + +def get_user_groups(username: str) -> List[str]: + """ + Returns a list of all groups to which the user belongs + + @param username: the user we want to check + """ + groups = [] + for group in grp.getgrall(): + if username in group.gr_mem: + groups.append(group.gr_name) + + gid = pwd.getpwnam(username).pw_gid + groups.append(grp.getgrgid(gid).gr_name) + return groups + + def write_file( filename, content, @@ -1904,8 +1952,7 @@ def write_file( if preserve_mode: try: - file_stat = os.stat(filename) - mode = stat.S_IMODE(file_stat.st_mode) + mode = get_permissions(filename) except OSError: pass From d66a44c4c774f335badb11e6949fe64fc8bda90e Mon Sep 17 00:00:00 2001 From: Emanuele Giuseppe Esposito Date: Wed, 4 Aug 2021 21:57:36 +0200 Subject: [PATCH 3/3] Fix existing tests and add new onew, to reflect the behavior we want Signed-off-by: Emanuele Giuseppe Esposito --- tests/unittests/test_sshutil.py | 952 +++++++++++++++++++++++++------- 1 file changed, 751 insertions(+), 201 deletions(-) diff --git a/tests/unittests/test_sshutil.py b/tests/unittests/test_sshutil.py index bcb8044fd5d..a66788bfcce 100644 --- a/tests/unittests/test_sshutil.py +++ b/tests/unittests/test_sshutil.py @@ -1,6 +1,9 @@ # This file is part of cloud-init. See LICENSE file for license information. +import os + from collections import namedtuple +from functools import partial from unittest.mock import patch from cloudinit import ssh_util @@ -8,13 +11,48 @@ from cloudinit import util # https://stackoverflow.com/questions/11351032/ -FakePwEnt = namedtuple( - 'FakePwEnt', - ['pw_dir', 'pw_gecos', 'pw_name', 'pw_passwd', 'pw_shell', 'pwd_uid']) +FakePwEnt = namedtuple('FakePwEnt', [ + 'pw_name', + 'pw_passwd', + 'pw_uid', + 'pw_gid', + 'pw_gecos', + 'pw_dir', + 'pw_shell', +]) FakePwEnt.__new__.__defaults__ = tuple( "UNSET_%s" % n for n in FakePwEnt._fields) +def mock_get_owner(updated_permissions, value): + try: + return updated_permissions[value][0] + except ValueError: + return util.get_owner(value) + + +def mock_get_group(updated_permissions, value): + try: + return updated_permissions[value][1] + except ValueError: + return util.get_group(value) + + +def mock_get_user_groups(username): + return username + + +def mock_get_permissions(updated_permissions, value): + try: + return updated_permissions[value][2] + except ValueError: + return util.get_permissions(value) + + +def mock_getpwnam(users, username): + return users[username] + + # Do not use these public keys, most of them are fetched from # the testdata for OpenSSH, and their private keys are available # https://github.com/openssh/openssh-portable/tree/master/regress/unittests/sshkey/testdata @@ -552,12 +590,30 @@ def test_user(self): ssh_util.render_authorizedkeysfile_paths( "/opt/%u/keys", "/home/bobby", "bobby")) + def test_user_file(self): + self.assertEqual( + ["/opt/bobby"], + ssh_util.render_authorizedkeysfile_paths( + "/opt/%u", "/home/bobby", "bobby")) + + def test_user_file2(self): + self.assertEqual( + ["/opt/bobby/bobby"], + ssh_util.render_authorizedkeysfile_paths( + "/opt/%u/%u", "/home/bobby", "bobby")) + def test_multiple(self): self.assertEqual( ["/keys/path1", "/keys/path2"], ssh_util.render_authorizedkeysfile_paths( "/keys/path1 /keys/path2", "/home/bobby", "bobby")) + def test_multiple2(self): + self.assertEqual( + ["/keys/path1", "/keys/bobby"], + ssh_util.render_authorizedkeysfile_paths( + "/keys/path1 /keys/%u", "/home/bobby", "bobby")) + def test_relative(self): self.assertEqual( ["/home/bobby/.secret/keys"], @@ -581,269 +637,763 @@ def test_all(self): class TestMultipleSshAuthorizedKeysFile(test_helpers.CiTestCase): - @patch("cloudinit.ssh_util.pwd.getpwnam") - def test_multiple_authorizedkeys_file_order1(self, m_getpwnam): - fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby') - m_getpwnam.return_value = fpw - user_ssh_folder = "%s/.ssh" % fpw.pw_dir - - # /tmp/home2/bobby/.ssh/authorized_keys = rsa - authorized_keys = self.tmp_path('authorized_keys', dir=user_ssh_folder) - util.write_file(authorized_keys, VALID_CONTENT['rsa']) - - # /tmp/home2/bobby/.ssh/user_keys = dsa - user_keys = self.tmp_path('user_keys', dir=user_ssh_folder) - util.write_file(user_keys, VALID_CONTENT['dsa']) - - # /tmp/sshd_config + def create_fake_users(self, names, mock_permissions, + m_get_group, m_get_owner, m_get_permissions, + m_getpwnam, users): + homes = [] + + root = '/tmp/root' + fpw = FakePwEnt(pw_name="root", pw_dir=root) + users["root"] = fpw + + for name in names: + home = '/tmp/home/' + name + fpw = FakePwEnt(pw_name=name, pw_dir=home) + users[name] = fpw + homes.append(home) + + m_get_permissions.side_effect = partial( + mock_get_permissions, mock_permissions) + m_get_owner.side_effect = partial(mock_get_owner, mock_permissions) + m_get_group.side_effect = partial(mock_get_group, mock_permissions) + m_getpwnam.side_effect = partial(mock_getpwnam, users) + return homes + + def create_user_authorized_file(self, home, filename, content_key, keys): + user_ssh_folder = "%s/.ssh" % home + # /tmp/home//.ssh/authorized_keys = content_key + authorized_keys = self.tmp_path(filename, dir=user_ssh_folder) + util.write_file(authorized_keys, VALID_CONTENT[content_key]) + keys[authorized_keys] = content_key + return authorized_keys + + def create_global_authorized_file(self, filename, content_key, keys): + authorized_keys = self.tmp_path(filename, dir='/tmp') + util.write_file(authorized_keys, VALID_CONTENT[content_key]) + keys[authorized_keys] = content_key + return authorized_keys + + def create_sshd_config(self, authorized_keys_files): sshd_config = self.tmp_path('sshd_config', dir="/tmp") util.write_file( sshd_config, - "AuthorizedKeysFile %s %s" % (authorized_keys, user_keys) + "AuthorizedKeysFile " + authorized_keys_files ) + return sshd_config + def execute_and_check(self, user, sshd_config, solution, keys, + delete_keys=True): (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( - fpw.pw_name, sshd_config) + user, sshd_config) content = ssh_util.update_authorized_keys(auth_key_entries, []) - self.assertEqual(user_keys, auth_key_fn) - self.assertTrue(VALID_CONTENT['rsa'] in content) - self.assertTrue(VALID_CONTENT['dsa'] in content) + self.assertEqual(auth_key_fn, solution) + for path, key in keys.items(): + if path == solution: + self.assertTrue(VALID_CONTENT[key] in content) + else: + self.assertFalse(VALID_CONTENT[key] in content) + + if delete_keys and os.path.isdir("/tmp/home/"): + util.delete_dir_contents("/tmp/home/") @patch("cloudinit.ssh_util.pwd.getpwnam") - def test_multiple_authorizedkeys_file_order2(self, m_getpwnam): - fpw = FakePwEnt(pw_name='suzie', pw_dir='/tmp/home/suzie') - m_getpwnam.return_value = fpw - user_ssh_folder = "%s/.ssh" % fpw.pw_dir + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_single_user_two_local_files( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam + ): + user_bobby = 'bobby' + keys = {} + users = {} + mock_permissions = { + '/tmp/home/bobby': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh/user_keys': ('bobby', 'bobby', 0o600), + '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600), + } + + homes = self.create_fake_users( + [user_bobby], mock_permissions, m_get_group, m_get_owner, + m_get_permissions, m_getpwnam, users + ) + home = homes[0] - # /tmp/home/suzie/.ssh/authorized_keys = rsa - authorized_keys = self.tmp_path('authorized_keys', dir=user_ssh_folder) - util.write_file(authorized_keys, VALID_CONTENT['rsa']) + # /tmp/home/bobby/.ssh/authorized_keys = rsa + authorized_keys = self.create_user_authorized_file( + home, 'authorized_keys', 'rsa', keys + ) - # /tmp/home/suzie/.ssh/user_keys = dsa - user_keys = self.tmp_path('user_keys', dir=user_ssh_folder) - util.write_file(user_keys, VALID_CONTENT['dsa']) + # /tmp/home/bobby/.ssh/user_keys = dsa + user_keys = self.create_user_authorized_file( + home, 'user_keys', 'dsa', keys + ) # /tmp/sshd_config - sshd_config = self.tmp_path('sshd_config', dir="/tmp") - util.write_file( - sshd_config, - "AuthorizedKeysFile %s %s" % (user_keys, authorized_keys) + options = "%s %s" % (authorized_keys, user_keys) + sshd_config = self.create_sshd_config(options) + + self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys) + + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_single_user_two_local_files_inverted( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam + ): + user_bobby = 'bobby' + keys = {} + users = {} + mock_permissions = { + '/tmp/home/bobby': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh/user_keys': ('bobby', 'bobby', 0o600), + '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600), + } + + homes = self.create_fake_users( + [user_bobby], mock_permissions, m_get_group, m_get_owner, + m_get_permissions, m_getpwnam, users ) + home = homes[0] - (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( - fpw.pw_name, sshd_config) - content = ssh_util.update_authorized_keys(auth_key_entries, []) + # /tmp/home/bobby/.ssh/authorized_keys = rsa + authorized_keys = self.create_user_authorized_file( + home, 'authorized_keys', 'rsa', keys + ) - self.assertEqual(authorized_keys, auth_key_fn) - self.assertTrue(VALID_CONTENT['rsa'] in content) - self.assertTrue(VALID_CONTENT['dsa'] in content) + # /tmp/home/bobby/.ssh/user_keys = dsa + user_keys = self.create_user_authorized_file( + home, 'user_keys', 'dsa', keys + ) - @patch("cloudinit.ssh_util.pwd.getpwnam") - def test_multiple_authorizedkeys_file_local_global(self, m_getpwnam): - fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby') - m_getpwnam.return_value = fpw - user_ssh_folder = "%s/.ssh" % fpw.pw_dir + # /tmp/sshd_config + options = "%s %s" % (user_keys, authorized_keys) + sshd_config = self.create_sshd_config(options) - # /tmp/home2/bobby/.ssh/authorized_keys = rsa - authorized_keys = self.tmp_path('authorized_keys', dir=user_ssh_folder) - util.write_file(authorized_keys, VALID_CONTENT['rsa']) + self.execute_and_check(user_bobby, sshd_config, user_keys, keys) - # /tmp/home2/bobby/.ssh/user_keys = dsa - user_keys = self.tmp_path('user_keys', dir=user_ssh_folder) - util.write_file(user_keys, VALID_CONTENT['dsa']) + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_single_user_local_global_files( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam + ): + user_bobby = 'bobby' + keys = {} + users = {} + mock_permissions = { + '/tmp/home/bobby': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh/user_keys': ('bobby', 'bobby', 0o600), + '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600), + } + + homes = self.create_fake_users( + [user_bobby], mock_permissions, m_get_group, m_get_owner, + m_get_permissions, m_getpwnam, users + ) + home = homes[0] - # /tmp/etc/ssh/authorized_keys = ecdsa - authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys', - dir="/tmp") - util.write_file(authorized_keys_global, VALID_CONTENT['ecdsa']) + # /tmp/home/bobby/.ssh/authorized_keys = rsa + authorized_keys = self.create_user_authorized_file( + home, 'authorized_keys', 'rsa', keys + ) - # /tmp/sshd_config - sshd_config = self.tmp_path('sshd_config', dir="/tmp") - util.write_file( - sshd_config, - "AuthorizedKeysFile %s %s %s" % (authorized_keys_global, - user_keys, authorized_keys) + # /tmp/home/bobby/.ssh/user_keys = dsa + user_keys = self.create_user_authorized_file( + home, 'user_keys', 'dsa', keys ) - (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( - fpw.pw_name, sshd_config) - content = ssh_util.update_authorized_keys(auth_key_entries, []) + authorized_keys_global = self.create_global_authorized_file( + 'etc/ssh/authorized_keys', 'ecdsa', keys + ) - self.assertEqual(authorized_keys, auth_key_fn) - self.assertTrue(VALID_CONTENT['rsa'] in content) - self.assertTrue(VALID_CONTENT['ecdsa'] in content) - self.assertTrue(VALID_CONTENT['dsa'] in content) + options = "%s %s %s" % (authorized_keys_global, user_keys, + authorized_keys) + sshd_config = self.create_sshd_config(options) - @patch("cloudinit.ssh_util.pwd.getpwnam") - def test_multiple_authorizedkeys_file_local_global2(self, m_getpwnam): - fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby') - m_getpwnam.return_value = fpw - user_ssh_folder = "%s/.ssh" % fpw.pw_dir + self.execute_and_check(user_bobby, sshd_config, user_keys, keys) - # /tmp/home2/bobby/.ssh/authorized_keys2 = rsa - authorized_keys = self.tmp_path('authorized_keys2', - dir=user_ssh_folder) - util.write_file(authorized_keys, VALID_CONTENT['rsa']) + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_single_user_local_global_files_inverted( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam + ): + user_bobby = 'bobby' + keys = {} + users = {} + mock_permissions = { + '/tmp/home/bobby': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh/user_keys3': ('bobby', 'bobby', 0o600), + '/tmp/home/bobby/.ssh/authorized_keys2': ('bobby', 'bobby', 0o600), + } + + homes = self.create_fake_users( + [user_bobby], mock_permissions, m_get_group, m_get_owner, + m_get_permissions, m_getpwnam, users + ) + home = homes[0] - # /tmp/home2/bobby/.ssh/user_keys3 = dsa - user_keys = self.tmp_path('user_keys3', dir=user_ssh_folder) - util.write_file(user_keys, VALID_CONTENT['dsa']) + # /tmp/home/bobby/.ssh/authorized_keys = rsa + authorized_keys = self.create_user_authorized_file( + home, 'authorized_keys2', 'rsa', keys + ) - # /tmp/etc/ssh/authorized_keys = ecdsa - authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys', - dir="/tmp") - util.write_file(authorized_keys_global, VALID_CONTENT['ecdsa']) + # /tmp/home/bobby/.ssh/user_keys = dsa + user_keys = self.create_user_authorized_file( + home, 'user_keys3', 'dsa', keys + ) - # /tmp/sshd_config - sshd_config = self.tmp_path('sshd_config', dir="/tmp") - util.write_file( - sshd_config, - "AuthorizedKeysFile %s %s %s" % (authorized_keys_global, - authorized_keys, user_keys) + authorized_keys_global = self.create_global_authorized_file( + 'etc/ssh/authorized_keys', 'ecdsa', keys ) - (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( - fpw.pw_name, sshd_config) - content = ssh_util.update_authorized_keys(auth_key_entries, []) + options = "%s %s %s" % (authorized_keys_global, authorized_keys, + user_keys) + sshd_config = self.create_sshd_config(options) - self.assertEqual(user_keys, auth_key_fn) - self.assertTrue(VALID_CONTENT['rsa'] in content) - self.assertTrue(VALID_CONTENT['ecdsa'] in content) - self.assertTrue(VALID_CONTENT['dsa'] in content) + self.execute_and_check(user_bobby, sshd_config, authorized_keys, keys) @patch("cloudinit.ssh_util.pwd.getpwnam") - def test_multiple_authorizedkeys_file_global(self, m_getpwnam): - fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby') - m_getpwnam.return_value = fpw + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_single_user_global_file( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam + ): + user_bobby = 'bobby' + keys = {} + users = {} + mock_permissions = { + '/tmp/home/bobby': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600), + } + + homes = self.create_fake_users( + [user_bobby], mock_permissions, m_get_group, m_get_owner, + m_get_permissions, m_getpwnam, users + ) + home = homes[0] # /tmp/etc/ssh/authorized_keys = rsa - authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys', - dir="/tmp") - util.write_file(authorized_keys_global, VALID_CONTENT['rsa']) + authorized_keys_global = self.create_global_authorized_file( + 'etc/ssh/authorized_keys', 'rsa', keys + ) - # /tmp/sshd_config - sshd_config = self.tmp_path('sshd_config') - util.write_file( - sshd_config, - "AuthorizedKeysFile %s" % (authorized_keys_global) + options = "%s" % authorized_keys_global + sshd_config = self.create_sshd_config(options) + + default = "%s/.ssh/authorized_keys" % home + self.execute_and_check(user_bobby, sshd_config, default, keys) + + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_two_users_local_file_standard( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam + ): + keys = {} + users = {} + mock_permissions = { + '/tmp/home/bobby': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600), + '/tmp/home/suzie': ('suzie', 'suzie', 0o700), + '/tmp/home/suzie/.ssh': ('suzie', 'suzie', 0o700), + '/tmp/home/suzie/.ssh/authorized_keys': ('suzie', 'suzie', 0o600), + } + + user_bobby = 'bobby' + user_suzie = 'suzie' + homes = self.create_fake_users( + [user_bobby, user_suzie], mock_permissions, m_get_group, + m_get_owner, m_get_permissions, m_getpwnam, users ) + home_bobby = homes[0] + home_suzie = homes[1] - (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( - fpw.pw_name, sshd_config) - content = ssh_util.update_authorized_keys(auth_key_entries, []) + # /tmp/home/bobby/.ssh/authorized_keys = rsa + authorized_keys = self.create_user_authorized_file( + home_bobby, 'authorized_keys', 'rsa', keys + ) - self.assertEqual("%s/.ssh/authorized_keys" % fpw.pw_dir, auth_key_fn) - self.assertTrue(VALID_CONTENT['rsa'] in content) + # /tmp/home/suzie/.ssh/authorized_keys = rsa + authorized_keys2 = self.create_user_authorized_file( + home_suzie, 'authorized_keys', 'ssh-xmss@openssh.com', keys + ) + + options = ".ssh/authorized_keys" + sshd_config = self.create_sshd_config(options) + + self.execute_and_check( + user_bobby, sshd_config, authorized_keys, keys, delete_keys=False + ) + self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys) @patch("cloudinit.ssh_util.pwd.getpwnam") - def test_multiple_authorizedkeys_file_multiuser(self, m_getpwnam): - fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home2/bobby') - m_getpwnam.return_value = fpw - user_ssh_folder = "%s/.ssh" % fpw.pw_dir - # /tmp/home2/bobby/.ssh/authorized_keys2 = rsa - authorized_keys = self.tmp_path('authorized_keys2', - dir=user_ssh_folder) - util.write_file(authorized_keys, VALID_CONTENT['rsa']) - # /tmp/home2/bobby/.ssh/user_keys3 = dsa - user_keys = self.tmp_path('user_keys3', dir=user_ssh_folder) - util.write_file(user_keys, VALID_CONTENT['dsa']) - - fpw2 = FakePwEnt(pw_name='suzie', pw_dir='/tmp/home/suzie') - user_ssh_folder = "%s/.ssh" % fpw2.pw_dir - # /tmp/home/suzie/.ssh/authorized_keys2 = ssh-xmss@openssh.com - authorized_keys2 = self.tmp_path('authorized_keys2', - dir=user_ssh_folder) - util.write_file(authorized_keys2, - VALID_CONTENT['ssh-xmss@openssh.com']) + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_two_users_local_file_custom( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam + ): + keys = {} + users = {} + mock_permissions = { + '/tmp/home/bobby': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh/authorized_keys2': ('bobby', 'bobby', 0o600), + '/tmp/home/suzie': ('suzie', 'suzie', 0o700), + '/tmp/home/suzie/.ssh': ('suzie', 'suzie', 0o700), + '/tmp/home/suzie/.ssh/authorized_keys2': ('suzie', 'suzie', 0o600), + } + + user_bobby = 'bobby' + user_suzie = 'suzie' + homes = self.create_fake_users( + [user_bobby, user_suzie], mock_permissions, m_get_group, + m_get_owner, m_get_permissions, m_getpwnam, users + ) + home_bobby = homes[0] + home_suzie = homes[1] - # /tmp/etc/ssh/authorized_keys = ecdsa - authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys2', - dir="/tmp") - util.write_file(authorized_keys_global, VALID_CONTENT['ecdsa']) + # /tmp/home/bobby/.ssh/authorized_keys2 = rsa + authorized_keys = self.create_user_authorized_file( + home_bobby, 'authorized_keys2', 'rsa', keys + ) - # /tmp/sshd_config - sshd_config = self.tmp_path('sshd_config', dir="/tmp") - util.write_file( - sshd_config, - "AuthorizedKeysFile %s %%h/.ssh/authorized_keys2 %s" % - (authorized_keys_global, user_keys) + # /tmp/home/suzie/.ssh/authorized_keys2 = rsa + authorized_keys2 = self.create_user_authorized_file( + home_suzie, 'authorized_keys2', 'ssh-xmss@openssh.com', keys ) - # process first user - (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( - fpw.pw_name, sshd_config) - content = ssh_util.update_authorized_keys(auth_key_entries, []) + options = ".ssh/authorized_keys2" + sshd_config = self.create_sshd_config(options) + + self.execute_and_check( + user_bobby, sshd_config, authorized_keys, keys, delete_keys=False + ) + self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys) - self.assertEqual(user_keys, auth_key_fn) - self.assertTrue(VALID_CONTENT['rsa'] in content) - self.assertTrue(VALID_CONTENT['ecdsa'] in content) - self.assertTrue(VALID_CONTENT['dsa'] in content) - self.assertFalse(VALID_CONTENT['ssh-xmss@openssh.com'] in content) + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_two_users_local_global_files( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam + ): + keys = {} + users = {} + mock_permissions = { + '/tmp/home/bobby': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh/authorized_keys2': ('bobby', 'bobby', 0o600), + '/tmp/home/bobby/.ssh/user_keys3': ('bobby', 'bobby', 0o600), + '/tmp/home/suzie': ('suzie', 'suzie', 0o700), + '/tmp/home/suzie/.ssh': ('suzie', 'suzie', 0o700), + '/tmp/home/suzie/.ssh/authorized_keys2': ('suzie', 'suzie', 0o600), + '/tmp/home/suzie/.ssh/user_keys3': ('suzie', 'suzie', 0o600), + } + + user_bobby = 'bobby' + user_suzie = 'suzie' + homes = self.create_fake_users( + [user_bobby, user_suzie], mock_permissions, m_get_group, + m_get_owner, m_get_permissions, m_getpwnam, users + ) + home_bobby = homes[0] + home_suzie = homes[1] - m_getpwnam.return_value = fpw2 - # process second user - (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( - fpw2.pw_name, sshd_config) - content = ssh_util.update_authorized_keys(auth_key_entries, []) + # /tmp/home/bobby/.ssh/authorized_keys2 = rsa + self.create_user_authorized_file( + home_bobby, 'authorized_keys2', 'rsa', keys + ) + # /tmp/home/bobby/.ssh/user_keys3 = dsa + user_keys = self.create_user_authorized_file( + home_bobby, 'user_keys3', 'dsa', keys + ) + + # /tmp/home/suzie/.ssh/authorized_keys2 = rsa + authorized_keys2 = self.create_user_authorized_file( + home_suzie, 'authorized_keys2', 'ssh-xmss@openssh.com', keys + ) + + # /tmp/etc/ssh/authorized_keys = ecdsa + authorized_keys_global = self.create_global_authorized_file( + 'etc/ssh/authorized_keys2', 'ecdsa', keys + ) + + options = "%s %s %%h/.ssh/authorized_keys2" % \ + (authorized_keys_global, user_keys) + sshd_config = self.create_sshd_config(options) - self.assertEqual(authorized_keys2, auth_key_fn) - self.assertTrue(VALID_CONTENT['ssh-xmss@openssh.com'] in content) - self.assertTrue(VALID_CONTENT['ecdsa'] in content) - self.assertTrue(VALID_CONTENT['dsa'] in content) - self.assertFalse(VALID_CONTENT['rsa'] in content) + self.execute_and_check( + user_bobby, sshd_config, user_keys, keys, delete_keys=False + ) + self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys) + @patch("cloudinit.util.get_user_groups") @patch("cloudinit.ssh_util.pwd.getpwnam") - def test_multiple_authorizedkeys_file_multiuser2(self, m_getpwnam): - fpw = FakePwEnt(pw_name='bobby', pw_dir='/tmp/home/bobby') - m_getpwnam.return_value = fpw - user_ssh_folder = "%s/.ssh" % fpw.pw_dir + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_two_users_local_global_files_badguy( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, + m_get_user_groups + ): + keys = {} + users = {} + mock_permissions = { + '/tmp/home/bobby': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh/authorized_keys2': ('bobby', 'bobby', 0o600), + '/tmp/home/bobby/.ssh/user_keys3': ('bobby', 'bobby', 0o600), + '/tmp/home/badguy': ('root', 'root', 0o755), + '/tmp/home/badguy/home': ('root', 'root', 0o755), + '/tmp/home/badguy/home/bobby': ('root', 'root', 0o655), + } + + user_bobby = 'bobby' + user_badguy = 'badguy' + home_bobby, *_ = self.create_fake_users( + [user_bobby, user_badguy], mock_permissions, m_get_group, + m_get_owner, m_get_permissions, m_getpwnam, users + ) + m_get_user_groups.side_effect = mock_get_user_groups + # /tmp/home/bobby/.ssh/authorized_keys2 = rsa - authorized_keys = self.tmp_path('authorized_keys2', - dir=user_ssh_folder) - util.write_file(authorized_keys, VALID_CONTENT['rsa']) + authorized_keys = self.create_user_authorized_file( + home_bobby, 'authorized_keys2', 'rsa', keys + ) # /tmp/home/bobby/.ssh/user_keys3 = dsa - user_keys = self.tmp_path('user_keys3', dir=user_ssh_folder) - util.write_file(user_keys, VALID_CONTENT['dsa']) + user_keys = self.create_user_authorized_file( + home_bobby, 'user_keys3', 'dsa', keys + ) - fpw2 = FakePwEnt(pw_name='badguy', pw_dir='/tmp/home/badguy') - user_ssh_folder = "%s/.ssh" % fpw2.pw_dir # /tmp/home/badguy/home/bobby = "" authorized_keys2 = self.tmp_path('home/bobby', dir="/tmp/home/badguy") + util.write_file(authorized_keys2, '') # /tmp/etc/ssh/authorized_keys = ecdsa - authorized_keys_global = self.tmp_path('etc/ssh/authorized_keys2', - dir="/tmp") - util.write_file(authorized_keys_global, VALID_CONTENT['ecdsa']) + authorized_keys_global = self.create_global_authorized_file( + 'etc/ssh/authorized_keys2', 'ecdsa', keys + ) # /tmp/sshd_config - sshd_config = self.tmp_path('sshd_config', dir="/tmp") - util.write_file( - sshd_config, - "AuthorizedKeysFile %s %%h/.ssh/authorized_keys2 %s %s" % - (authorized_keys_global, user_keys, authorized_keys2) + options = "%s %%h/.ssh/authorized_keys2 %s %s" % \ + (authorized_keys2, authorized_keys_global, user_keys) + sshd_config = self.create_sshd_config(options) + + self.execute_and_check( + user_bobby, sshd_config, authorized_keys, keys, delete_keys=False + ) + self.execute_and_check( + user_badguy, sshd_config, authorized_keys2, keys ) - # process first user - (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( - fpw.pw_name, sshd_config) - content = ssh_util.update_authorized_keys(auth_key_entries, []) + @patch("cloudinit.util.get_user_groups") + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_two_users_unaccessible_file( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, + m_get_user_groups + ): + keys = {} + users = {} + mock_permissions = { + '/tmp/home/bobby': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600), + + '/tmp/etc': ('root', 'root', 0o755), + '/tmp/etc/ssh': ('root', 'root', 0o755), + '/tmp/etc/ssh/userkeys': ('root', 'root', 0o700), + '/tmp/etc/ssh/userkeys/bobby': ('bobby', 'bobby', 0o600), + '/tmp/etc/ssh/userkeys/badguy': ('badguy', 'badguy', 0o600), + + '/tmp/home/badguy': ('badguy', 'badguy', 0o700), + '/tmp/home/badguy/.ssh': ('badguy', 'badguy', 0o700), + '/tmp/home/badguy/.ssh/authorized_keys': + ('badguy', 'badguy', 0o600), + } + + user_bobby = 'bobby' + user_badguy = 'badguy' + homes = self.create_fake_users( + [user_bobby, user_badguy], mock_permissions, m_get_group, + m_get_owner, m_get_permissions, m_getpwnam, users + ) + m_get_user_groups.side_effect = mock_get_user_groups + home_bobby = homes[0] + home_badguy = homes[1] - self.assertEqual(user_keys, auth_key_fn) - self.assertTrue(VALID_CONTENT['rsa'] in content) - self.assertTrue(VALID_CONTENT['ecdsa'] in content) - self.assertTrue(VALID_CONTENT['dsa'] in content) + # /tmp/home/bobby/.ssh/authorized_keys = rsa + authorized_keys = self.create_user_authorized_file( + home_bobby, 'authorized_keys', 'rsa', keys + ) + # /tmp/etc/ssh/userkeys/bobby = dsa + # assume here that we can bypass userkeys, despite permissions + self.create_global_authorized_file( + 'etc/ssh/userkeys/bobby', 'dsa', keys + ) - m_getpwnam.return_value = fpw2 - # process second user - (auth_key_fn, auth_key_entries) = ssh_util.extract_authorized_keys( - fpw2.pw_name, sshd_config) - content = ssh_util.update_authorized_keys(auth_key_entries, []) + # /tmp/home/badguy/.ssh/authorized_keys = ssh-xmss@openssh.com + authorized_keys2 = self.create_user_authorized_file( + home_badguy, 'authorized_keys', 'ssh-xmss@openssh.com', keys + ) - # badguy should not take the key from the other user! - self.assertEqual(authorized_keys2, auth_key_fn) - self.assertTrue(VALID_CONTENT['ecdsa'] in content) - self.assertTrue(VALID_CONTENT['dsa'] in content) - self.assertFalse(VALID_CONTENT['rsa'] in content) + # /tmp/etc/ssh/userkeys/badguy = ecdsa + self.create_global_authorized_file( + 'etc/ssh/userkeys/badguy', 'ecdsa', keys + ) + + # /tmp/sshd_config + options = "/tmp/etc/ssh/userkeys/%u .ssh/authorized_keys" + sshd_config = self.create_sshd_config(options) + + self.execute_and_check( + user_bobby, sshd_config, authorized_keys, keys, delete_keys=False + ) + self.execute_and_check( + user_badguy, sshd_config, authorized_keys2, keys + ) + + @patch("cloudinit.util.get_user_groups") + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_two_users_accessible_file( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, + m_get_user_groups + ): + keys = {} + users = {} + mock_permissions = { + '/tmp/home/bobby': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600), + + '/tmp/etc': ('root', 'root', 0o755), + '/tmp/etc/ssh': ('root', 'root', 0o755), + '/tmp/etc/ssh/userkeys': ('root', 'root', 0o755), + '/tmp/etc/ssh/userkeys/bobby': ('bobby', 'bobby', 0o600), + '/tmp/etc/ssh/userkeys/badguy': ('badguy', 'badguy', 0o600), + + '/tmp/home/badguy': ('badguy', 'badguy', 0o700), + '/tmp/home/badguy/.ssh': ('badguy', 'badguy', 0o700), + '/tmp/home/badguy/.ssh/authorized_keys': + ('badguy', 'badguy', 0o600), + } + + user_bobby = 'bobby' + user_badguy = 'badguy' + homes = self.create_fake_users( + [user_bobby, user_badguy], mock_permissions, m_get_group, + m_get_owner, m_get_permissions, m_getpwnam, users + ) + m_get_user_groups.side_effect = mock_get_user_groups + home_bobby = homes[0] + home_badguy = homes[1] + + # /tmp/home/bobby/.ssh/authorized_keys = rsa + self.create_user_authorized_file( + home_bobby, 'authorized_keys', 'rsa', keys + ) + # /tmp/etc/ssh/userkeys/bobby = dsa + # assume here that we can bypass userkeys, despite permissions + authorized_keys = self.create_global_authorized_file( + 'etc/ssh/userkeys/bobby', 'dsa', keys + ) + + # /tmp/home/badguy/.ssh/authorized_keys = ssh-xmss@openssh.com + self.create_user_authorized_file( + home_badguy, 'authorized_keys', 'ssh-xmss@openssh.com', keys + ) + + # /tmp/etc/ssh/userkeys/badguy = ecdsa + authorized_keys2 = self.create_global_authorized_file( + 'etc/ssh/userkeys/badguy', 'ecdsa', keys + ) + + # /tmp/sshd_config + options = "/tmp/etc/ssh/userkeys/%u .ssh/authorized_keys" + sshd_config = self.create_sshd_config(options) + + self.execute_and_check( + user_bobby, sshd_config, authorized_keys, keys, delete_keys=False + ) + self.execute_and_check( + user_badguy, sshd_config, authorized_keys2, keys + ) + + @patch("cloudinit.util.get_user_groups") + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_two_users_hardcoded_single_user_file( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, + m_get_user_groups + ): + keys = {} + users = {} + mock_permissions = { + '/tmp/home/bobby': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600), + + '/tmp/home/suzie': ('suzie', 'suzie', 0o700), + '/tmp/home/suzie/.ssh': ('suzie', 'suzie', 0o700), + '/tmp/home/suzie/.ssh/authorized_keys': ('suzie', 'suzie', 0o600), + } + + user_bobby = 'bobby' + user_suzie = 'suzie' + homes = self.create_fake_users( + [user_bobby, user_suzie], mock_permissions, m_get_group, + m_get_owner, m_get_permissions, m_getpwnam, users + ) + home_bobby = homes[0] + home_suzie = homes[1] + m_get_user_groups.side_effect = mock_get_user_groups + + # /tmp/home/bobby/.ssh/authorized_keys = rsa + authorized_keys = self.create_user_authorized_file( + home_bobby, 'authorized_keys', 'rsa', keys + ) + + # /tmp/home/suzie/.ssh/authorized_keys = ssh-xmss@openssh.com + self.create_user_authorized_file( + home_suzie, 'authorized_keys', 'ssh-xmss@openssh.com', keys + ) + + # /tmp/sshd_config + options = "%s" % (authorized_keys) + sshd_config = self.create_sshd_config(options) + + self.execute_and_check( + user_bobby, sshd_config, authorized_keys, keys, delete_keys=False + ) + default = "%s/.ssh/authorized_keys" % home_suzie + self.execute_and_check(user_suzie, sshd_config, default, keys) + + @patch("cloudinit.util.get_user_groups") + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_two_users_hardcoded_single_user_file_inverted( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, + m_get_user_groups + ): + keys = {} + users = {} + mock_permissions = { + '/tmp/home/bobby': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600), + + '/tmp/home/suzie': ('suzie', 'suzie', 0o700), + '/tmp/home/suzie/.ssh': ('suzie', 'suzie', 0o700), + '/tmp/home/suzie/.ssh/authorized_keys': ('suzie', 'suzie', 0o600), + } + + user_bobby = 'bobby' + user_suzie = 'suzie' + homes = self.create_fake_users( + [user_bobby, user_suzie], mock_permissions, m_get_group, + m_get_owner, m_get_permissions, m_getpwnam, users + ) + home_bobby = homes[0] + home_suzie = homes[1] + m_get_user_groups.side_effect = mock_get_user_groups + + # /tmp/home/bobby/.ssh/authorized_keys = rsa + self.create_user_authorized_file( + home_bobby, 'authorized_keys', 'rsa', keys + ) + + # /tmp/home/suzie/.ssh/authorized_keys = ssh-xmss@openssh.com + authorized_keys2 = self.create_user_authorized_file( + home_suzie, 'authorized_keys', 'ssh-xmss@openssh.com', keys + ) + + # /tmp/sshd_config + options = "%s" % (authorized_keys2) + sshd_config = self.create_sshd_config(options) + + default = "%s/.ssh/authorized_keys" % home_bobby + self.execute_and_check( + user_bobby, sshd_config, default, keys, delete_keys=False + ) + self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys) + + @patch("cloudinit.util.get_user_groups") + @patch("cloudinit.ssh_util.pwd.getpwnam") + @patch("cloudinit.util.get_permissions") + @patch("cloudinit.util.get_owner") + @patch("cloudinit.util.get_group") + def test_two_users_hardcoded_user_files( + self, m_get_group, m_get_owner, m_get_permissions, m_getpwnam, + m_get_user_groups + ): + keys = {} + users = {} + mock_permissions = { + '/tmp/home/bobby': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh': ('bobby', 'bobby', 0o700), + '/tmp/home/bobby/.ssh/authorized_keys': ('bobby', 'bobby', 0o600), + + '/tmp/home/suzie': ('suzie', 'suzie', 0o700), + '/tmp/home/suzie/.ssh': ('suzie', 'suzie', 0o700), + '/tmp/home/suzie/.ssh/authorized_keys': ('suzie', 'suzie', 0o600), + } + + user_bobby = 'bobby' + user_suzie = 'suzie' + homes = self.create_fake_users( + [user_bobby, user_suzie], mock_permissions, m_get_group, + m_get_owner, m_get_permissions, m_getpwnam, users + ) + home_bobby = homes[0] + home_suzie = homes[1] + m_get_user_groups.side_effect = mock_get_user_groups + + # /tmp/home/bobby/.ssh/authorized_keys = rsa + authorized_keys = self.create_user_authorized_file( + home_bobby, 'authorized_keys', 'rsa', keys + ) + + # /tmp/home/suzie/.ssh/authorized_keys = ssh-xmss@openssh.com + authorized_keys2 = self.create_user_authorized_file( + home_suzie, 'authorized_keys', 'ssh-xmss@openssh.com', keys + ) + + # /tmp/etc/ssh/authorized_keys = ecdsa + authorized_keys_global = self.create_global_authorized_file( + 'etc/ssh/authorized_keys', 'ecdsa', keys + ) + + # /tmp/sshd_config + options = "%s %s %s" % \ + (authorized_keys_global, authorized_keys, authorized_keys2) + sshd_config = self.create_sshd_config(options) + + self.execute_and_check( + user_bobby, sshd_config, authorized_keys, keys, delete_keys=False + ) + self.execute_and_check(user_suzie, sshd_config, authorized_keys2, keys) # vi: ts=4 expandtab