From 4252a5f6f0ca225afb78da86da7cbaeb4f1c6018 Mon Sep 17 00:00:00 2001 From: Alberto Contreras Date: Fri, 13 May 2022 09:26:51 +0200 Subject: [PATCH 1/2] Drop support of *-sk keys. Delete *-sk keys from cloud-init-schema.json under cc_ssh.{ssh_keys,ssh_genkeytypes} Log a warning if some given key is unsupported or unknown. Port tests to Pytests, add some types and increase unittest coverage. --- cloudinit/config/cc_ssh.py | 45 +- cloudinit/config/cloud-init-schema.json | 6 +- tests/unittests/config/test_cc_ssh.py | 563 +++++++++++------------- 3 files changed, 284 insertions(+), 330 deletions(-) diff --git a/cloudinit/config/cc_ssh.py b/cloudinit/config/cc_ssh.py index ac5640b709d..33c1fd0cc08 100644 --- a/cloudinit/config/cc_ssh.py +++ b/cloudinit/config/cc_ssh.py @@ -9,10 +9,14 @@ import glob import os +import re import sys +from logging import Logger from textwrap import dedent +from typing import List, Optional, Sequence from cloudinit import ssh_util, subp, util +from cloudinit.cloud import Cloud from cloudinit.config.schema import MetaSchema, get_meta_doc from cloudinit.distros import ALL_DISTROS, ug_util from cloudinit.settings import PER_INSTANCE @@ -103,12 +107,22 @@ - dsa - ecdsa - - ecdsa-sk - ed25519 - - ed25519-sk - rsa + +Unsupported host key types for the ``ssh_keys`` and the ``ssh_genkeytypes`` +config flags are: + + - ecdsa-sk + - ed25519-sk """ +# Note: We do not support *-sk key types because: +# 1) In the autogeneration case user interaction with the device is needed +# which does not fit with a cloud-context. +# 2) This type of keys are user-based, not hostkeys. + + meta: MetaSchema = { "id": "cc_ssh", "name": "SSH", @@ -156,6 +170,9 @@ __doc__ = get_meta_doc(meta) GENERATE_KEY_NAMES = ["rsa", "dsa", "ecdsa", "ed25519"] +pattern_unsupported_config_keys = re.compile( + "^(ecdsa-sk|ed25519-sk)_(private|public|certificate)$" +) KEY_FILE_TPL = "/etc/ssh/ssh_host_%s_key" PUBLISH_HOST_KEYS = True # Don't publish the dsa hostkey by default since OpenSSH recommends not using @@ -165,19 +182,19 @@ CONFIG_KEY_TO_FILE = {} PRIV_TO_PUB = {} for k in GENERATE_KEY_NAMES: - CONFIG_KEY_TO_FILE.update({"%s_private" % k: (KEY_FILE_TPL % k, 0o600)}) - CONFIG_KEY_TO_FILE.update( - {"%s_public" % k: (KEY_FILE_TPL % k + ".pub", 0o600)} - ) CONFIG_KEY_TO_FILE.update( - {"%s_certificate" % k: (KEY_FILE_TPL % k + "-cert.pub", 0o600)} + { + f"{k}_private": (KEY_FILE_TPL % k, 0o600), + f"{k}_public": (f"{KEY_FILE_TPL % k}.pub", 0o600), + f"{k}_certificate": (f"{KEY_FILE_TPL % k}-cert.pub", 0o600), + } ) - PRIV_TO_PUB["%s_private" % k] = "%s_public" % k + PRIV_TO_PUB[f"{k}_private"] = f"{k}_public" KEY_GEN_TPL = 'o=$(ssh-keygen -yf "%s") && echo "$o" root@localhost > "%s"' -def handle(_name, cfg, cloud, log, _args): +def handle(_name, cfg, cloud: Cloud, log: Logger, _args): # remove the static keys from the pristine image if cfg.get("ssh_deletekeys", True): @@ -191,8 +208,12 @@ def handle(_name, cfg, cloud, log, _args): if "ssh_keys" in cfg: # if there are keys and/or certificates in cloud-config, use them for (key, val) in cfg["ssh_keys"].items(): - # skip entry if unrecognized if key not in CONFIG_KEY_TO_FILE: + if pattern_unsupported_config_keys.match(key): + reason = "unsupported" + else: + reason = "unrecognized" + log.warning("Skipping %s ssh_keys" ' entry: "%s"', reason, key) continue tgt_fn = CONFIG_KEY_TO_FILE[key][0] tgt_perms = CONFIG_KEY_TO_FILE[key][1] @@ -297,7 +318,7 @@ def handle(_name, cfg, cloud, log, _args): cfg, "disable_root_opts", ssh_util.DISABLE_USER_OPTS ) - keys = [] + keys: List[str] = [] if util.get_cfg_option_bool(cfg, "allow_public_ssh_keys", True): keys = cloud.get_public_ssh_keys() or [] else: @@ -332,7 +353,7 @@ def apply_credentials(keys, user, disable_root, disable_root_opts): ssh_util.setup_user_keys(keys, "root", options=key_prefix) -def get_public_host_keys(blacklist=None): +def get_public_host_keys(blacklist: Optional[Sequence[str]] = None): """Read host keys from /etc/ssh/*.pub files and return them as a list. @param blacklist: List of key types to ignore. e.g. ['dsa', 'rsa'] diff --git a/cloudinit/config/cloud-init-schema.json b/cloudinit/config/cloud-init-schema.json index 13c9efb1b28..70e61478875 100644 --- a/cloudinit/config/cloud-init-schema.json +++ b/cloudinit/config/cloud-init-schema.json @@ -1886,7 +1886,7 @@ "type": "object", "description": "A dictionary entries for the public and private host keys of each desired key type. Entries in the ``ssh_keys`` config dict should have keys in the format ``_private``, ``_public``, and, optionally, ``_certificate``, e.g. ``rsa_private: ``, ``rsa_public: ``, and ``rsa_certificate: ``. Not all key types have to be specified, ones left unspecified will not be used. If this config option is used, then separate keys will not be automatically generated. In order to specify multiline private host keys and certificates, use yaml multiline syntax.", "patternProperties": { - "^(dsa|ecdsa|ecdsa-sk|ed25519|ed25519-sk|rsa)_(public|private|certificate)$": { + "^(dsa|ecdsa|ed25519|rsa)_(public|private|certificate)$": { "label": "", "type": "string" } @@ -1909,11 +1909,11 @@ "ssh_genkeytypes": { "type": "array", "description": "The SSH key types to generate. Default: ``[rsa, dsa, ecdsa, ed25519]``", - "default": ["dsa", "ecdsa", "ecdsa-sk", "ed25519", "ed25519-sk", "rsa"], + "default": ["dsa", "ecdsa", "ed25519", "rsa"], "minItems": 1, "items": { "type": "string", - "enum": ["dsa", "ecdsa", "ecdsa-sk", "ed25519", "ed25519-sk", "rsa"] + "enum": ["dsa", "ecdsa", "ed25519", "rsa"] } }, "disable_root": { diff --git a/tests/unittests/config/test_cc_ssh.py b/tests/unittests/config/test_cc_ssh.py index 5889e1805af..9d157338290 100644 --- a/tests/unittests/config/test_cc_ssh.py +++ b/tests/unittests/config/test_cc_ssh.py @@ -2,6 +2,8 @@ import logging import os.path +from typing import Optional +from unittest import mock import pytest @@ -12,7 +14,8 @@ get_schema, validate_cloudconfig_schema, ) -from tests.unittests.helpers import CiTestCase, mock, skipUnlessJsonSchema +from tests.unittests.helpers import skipUnlessJsonSchema +from tests.unittests.util import get_cloud LOG = logging.getLogger(__name__) @@ -22,80 +25,92 @@ ] -@mock.patch(MODPATH + "ssh_util.setup_user_keys") -class TestHandleSsh(CiTestCase): - """Test cc_ssh handling of ssh config.""" +@pytest.fixture(scope="function") +def _publish_hostkey_test_setup(tmpdir): + test_hostkeys = { + "dsa": ("ssh-dss", "AAAAB3NzaC1kc3MAAACB"), + "ecdsa": ("ecdsa-sha2-nistp256", "AAAAE2VjZ"), + "ed25519": ("ssh-ed25519", "AAAAC3NzaC1lZDI"), + "rsa": ("ssh-rsa", "AAAAB3NzaC1yc2EAAA"), + } + test_hostkey_files = [] + hostkey_tmpdir = tmpdir + for key_type in cc_ssh.GENERATE_KEY_NAMES: + filename = "ssh_host_%s_key.pub" % key_type + filepath = os.path.join(hostkey_tmpdir, filename) + test_hostkey_files.append(filepath) + with open(filepath, "w") as f: + f.write(" ".join(test_hostkeys[key_type])) + + cc_ssh.KEY_FILE_TPL = os.path.join(hostkey_tmpdir, "ssh_host_%s_key") + yield test_hostkeys, test_hostkey_files + + +def _replace_options(user: Optional[str] = None) -> str: + options = ssh_util.DISABLE_USER_OPTS + if user: + new_user = user + else: + new_user = "NONE" + options = options.replace("$USER", new_user) + options = options.replace("$DISABLE_USER", "root") + return options - def _publish_hostkey_test_setup(self): - self.test_hostkeys = { - "dsa": ("ssh-dss", "AAAAB3NzaC1kc3MAAACB"), - "ecdsa": ("ecdsa-sha2-nistp256", "AAAAE2VjZ"), - "ed25519": ("ssh-ed25519", "AAAAC3NzaC1lZDI"), - "rsa": ("ssh-rsa", "AAAAB3NzaC1yc2EAAA"), - } - self.test_hostkey_files = [] - hostkey_tmpdir = self.tmp_dir() - for key_type in cc_ssh.GENERATE_KEY_NAMES: - key_data = self.test_hostkeys[key_type] - filename = "ssh_host_%s_key.pub" % key_type - filepath = os.path.join(hostkey_tmpdir, filename) - self.test_hostkey_files.append(filepath) - with open(filepath, "w") as f: - f.write(" ".join(key_data)) - - cc_ssh.KEY_FILE_TPL = os.path.join(hostkey_tmpdir, "ssh_host_%s_key") - - def test_apply_credentials_with_user(self, m_setup_keys): - """Apply keys for the given user and root.""" - keys = ["key1"] - user = "clouduser" - cc_ssh.apply_credentials(keys, user, False, ssh_util.DISABLE_USER_OPTS) - self.assertEqual( - [ - mock.call(set(keys), user), - mock.call(set(keys), "root", options=""), - ], - m_setup_keys.call_args_list, - ) - - def test_apply_credentials_with_no_user(self, m_setup_keys): - """Apply keys for root only.""" - keys = ["key1"] - user = None - cc_ssh.apply_credentials(keys, user, False, ssh_util.DISABLE_USER_OPTS) - self.assertEqual( - [mock.call(set(keys), "root", options="")], - m_setup_keys.call_args_list, - ) - def test_apply_credentials_with_user_disable_root(self, m_setup_keys): - """Apply keys for the given user and disable root ssh.""" - keys = ["key1"] - user = "clouduser" - options = ssh_util.DISABLE_USER_OPTS - cc_ssh.apply_credentials(keys, user, True, options) - options = options.replace("$USER", user) - options = options.replace("$DISABLE_USER", "root") - self.assertEqual( - [ - mock.call(set(keys), user), - mock.call(set(keys), "root", options=options), - ], - m_setup_keys.call_args_list, - ) +@mock.patch(MODPATH + "ssh_util.setup_user_keys") +class TestHandleSsh: + """Test cc_ssh handling of ssh config.""" - def test_apply_credentials_with_no_user_disable_root(self, m_setup_keys): - """Apply keys no user and disable root ssh.""" - keys = ["key1"] - user = None + @pytest.mark.parametrize( + "args,kwargs", + [ + # For the given user and root. + pytest.param([["key1"], "clouduser", False], {}, id="with_user"), + # For root only. + pytest.param([["key1"], None, False], {}, id="with_no_user"), + # For the given user and disable root ssh. + pytest.param( + [ + ["key1"], + "clouduser", + True, + ], + {}, + id="with_user_disable_root", + ), + # No user and disable root ssh. + pytest.param( + [ + ["key1"], + None, + True, + ], + {}, + id="with_no_user_disable_root", + ), + ], + ) + def test_apply_credentials( + self, + m_setup_keys, + args, + kwargs, + ): + keys, user, *args, disable_root_opts = args options = ssh_util.DISABLE_USER_OPTS - cc_ssh.apply_credentials(keys, user, True, options) - options = options.replace("$USER", "NONE") - options = options.replace("$DISABLE_USER", "root") - self.assertEqual( - [mock.call(set(keys), "root", options=options)], - m_setup_keys.call_args_list, + cc_ssh.apply_credentials( + keys, user, *args, disable_root_opts, options, **kwargs ) + if not disable_root_opts: + expected_options = "" + else: + expected_options = _replace_options(user) + expected_calls = [ + mock.call(set(keys), "root", options=expected_options) + ] + if user: + expected_calls = [mock.call(set(keys), user)] + expected_calls + assert expected_calls == m_setup_keys.call_args_list @mock.patch(MODPATH + "glob.glob") @mock.patch(MODPATH + "ug_util.normalize_users_groups") @@ -109,24 +124,20 @@ def test_handle_no_cfg(self, m_path_exists, m_nug, m_glob, m_setup_keys): m_path_exists.return_value = True m_nug.return_value = ([], {}) cc_ssh.PUBLISH_HOST_KEYS = False - cloud = self.tmp_cloud(distro="ubuntu", metadata={"public-keys": keys}) + cloud = get_cloud(distro="ubuntu", metadata={"public-keys": keys}) cc_ssh.handle("name", cfg, cloud, LOG, None) options = ssh_util.DISABLE_USER_OPTS.replace("$USER", "NONE") options = options.replace("$DISABLE_USER", "root") m_glob.assert_called_once_with("/etc/ssh/ssh_host_*key*") - self.assertIn( - [ - mock.call("/etc/ssh/ssh_host_rsa_key"), - mock.call("/etc/ssh/ssh_host_dsa_key"), - mock.call("/etc/ssh/ssh_host_ecdsa_key"), - mock.call("/etc/ssh/ssh_host_ed25519_key"), - ], - m_path_exists.call_args_list, - ) - self.assertEqual( - [mock.call(set(keys), "root", options=options)], - m_setup_keys.call_args_list, - ) + assert [ + mock.call("/etc/ssh/ssh_host_rsa_key"), + mock.call("/etc/ssh/ssh_host_dsa_key"), + mock.call("/etc/ssh/ssh_host_ecdsa_key"), + mock.call("/etc/ssh/ssh_host_ed25519_key"), + ] in m_path_exists.call_args_list + assert [ + mock.call(set(keys), "root", options=options) + ] == m_setup_keys.call_args_list @mock.patch(MODPATH + "glob.glob") @mock.patch(MODPATH + "ug_util.normalize_users_groups") @@ -144,207 +155,115 @@ def test_dont_allow_public_ssh_keys( # Mock os.path.exits to True to short-circuit the key writing logic m_path_exists.return_value = True m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud(distro="ubuntu", metadata={"public-keys": keys}) - cc_ssh.handle("name", cfg, cloud, LOG, None) - - options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user) - options = options.replace("$DISABLE_USER", "root") - self.assertEqual( - [ - mock.call(set(), user), - mock.call(set(), "root", options=options), - ], - m_setup_keys.call_args_list, - ) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_no_cfg_and_default_root( - self, m_path_exists, m_nug, m_glob, m_setup_keys - ): - """Test handle with no config and a default distro user.""" - cfg = {} - keys = ["key1"] - user = "clouduser" - m_glob.return_value = [] # Return no matching keys to prevent removal - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud(distro="ubuntu", metadata={"public-keys": keys}) - cc_ssh.handle("name", cfg, cloud, LOG, None) - - options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user) - options = options.replace("$DISABLE_USER", "root") - self.assertEqual( - [ - mock.call(set(keys), user), - mock.call(set(keys), "root", options=options), - ], - m_setup_keys.call_args_list, - ) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_cfg_with_explicit_disable_root( - self, m_path_exists, m_nug, m_glob, m_setup_keys - ): - """Test handle with explicit disable_root and a default distro user.""" - # This test is identical to test_handle_no_cfg_and_default_root, - # except this uses an explicit cfg value - cfg = {"disable_root": True} - keys = ["key1"] - user = "clouduser" - m_glob.return_value = [] # Return no matching keys to prevent removal - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud(distro="ubuntu", metadata={"public-keys": keys}) + cloud = get_cloud(distro="ubuntu", metadata={"public-keys": keys}) cc_ssh.handle("name", cfg, cloud, LOG, None) options = ssh_util.DISABLE_USER_OPTS.replace("$USER", user) options = options.replace("$DISABLE_USER", "root") - self.assertEqual( - [ - mock.call(set(keys), user), - mock.call(set(keys), "root", options=options), - ], - m_setup_keys.call_args_list, - ) + assert [ + mock.call(set(), user), + mock.call(set(), "root", options=options), + ] == m_setup_keys.call_args_list + @pytest.mark.parametrize( + "cfg,mock_get_public_ssh_keys,empty_opts", + [ + pytest.param({}, False, False, id="no_cfg"), + pytest.param( + {"disable_root": True}, + False, + False, + id="explicit_disable_root", + ), + # When disable_root == False, the ssh redirect for root is skipped + pytest.param( + {"disable_root": False}, + True, + True, + id="cfg_without_disable_root", + ), + ], + ) @mock.patch(MODPATH + "glob.glob") @mock.patch(MODPATH + "ug_util.normalize_users_groups") @mock.patch(MODPATH + "os.path.exists") - def test_handle_cfg_without_disable_root( - self, m_path_exists, m_nug, m_glob, m_setup_keys + def test_handle_default_root( + self, + m_path_exists, + m_nug, + m_glob, + m_setup_keys, + cfg, + mock_get_public_ssh_keys, + empty_opts, ): - """Test handle with disable_root == False.""" - # When disable_root == False, the ssh redirect for root is skipped - cfg = {"disable_root": False} + """Test handle with a default distro user.""" keys = ["key1"] user = "clouduser" m_glob.return_value = [] # Return no matching keys to prevent removal # Mock os.path.exits to True to short-circuit the key writing logic m_path_exists.return_value = True m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud(distro="ubuntu", metadata={"public-keys": keys}) - cloud.get_public_ssh_keys = mock.Mock(return_value=keys) + cloud = get_cloud(distro="ubuntu", metadata={"public-keys": keys}) + if mock_get_public_ssh_keys: + cloud.get_public_ssh_keys = mock.Mock(return_value=keys) cc_ssh.handle("name", cfg, cloud, LOG, None) - self.assertEqual( - [ - mock.call(set(keys), user), - mock.call(set(keys), "root", options=""), - ], - m_setup_keys.call_args_list, - ) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_publish_hostkeys_default( - self, m_path_exists, m_nug, m_glob, m_setup_keys - ): - """Test handle with various configs for ssh_publish_hostkeys.""" - self._publish_hostkey_test_setup() - cc_ssh.PUBLISH_HOST_KEYS = True - keys = ["key1"] - user = "clouduser" - # Return no matching keys for first glob, test keys for second. - m_glob.side_effect = iter( - [ - [], - self.test_hostkey_files, - ] - ) - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud(distro="ubuntu", metadata={"public-keys": keys}) - cloud.datasource.publish_host_keys = mock.Mock() - - cfg = {} - expected_call = [ - self.test_hostkeys[key_type] for key_type in KEY_NAMES_NO_DSA - ] - cc_ssh.handle("name", cfg, cloud, LOG, None) - self.assertEqual( - [mock.call(expected_call)], - cloud.datasource.publish_host_keys.call_args_list, - ) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_publish_hostkeys_config_enable( - self, m_path_exists, m_nug, m_glob, m_setup_keys - ): - """Test handle with various configs for ssh_publish_hostkeys.""" - self._publish_hostkey_test_setup() - cc_ssh.PUBLISH_HOST_KEYS = False - keys = ["key1"] - user = "clouduser" - # Return no matching keys for first glob, test keys for second. - m_glob.side_effect = iter( - [ - [], - self.test_hostkey_files, - ] - ) - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud(distro="ubuntu", metadata={"public-keys": keys}) - cloud.datasource.publish_host_keys = mock.Mock() - - cfg = {"ssh_publish_hostkeys": {"enabled": True}} - expected_call = [ - self.test_hostkeys[key_type] for key_type in KEY_NAMES_NO_DSA - ] - cc_ssh.handle("name", cfg, cloud, LOG, None) - self.assertEqual( - [mock.call(expected_call)], - cloud.datasource.publish_host_keys.call_args_list, - ) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_publish_hostkeys_config_disable( - self, m_path_exists, m_nug, m_glob, m_setup_keys - ): - """Test handle with various configs for ssh_publish_hostkeys.""" - self._publish_hostkey_test_setup() - cc_ssh.PUBLISH_HOST_KEYS = True - keys = ["key1"] - user = "clouduser" - # Return no matching keys for first glob, test keys for second. - m_glob.side_effect = iter( - [ - [], - self.test_hostkey_files, - ] - ) - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud(distro="ubuntu", metadata={"public-keys": keys}) - cloud.datasource.publish_host_keys = mock.Mock() - - cfg = {"ssh_publish_hostkeys": {"enabled": False}} - cc_ssh.handle("name", cfg, cloud, LOG, None) - self.assertFalse(cloud.datasource.publish_host_keys.call_args_list) - cloud.datasource.publish_host_keys.assert_not_called() + if empty_opts: + options = "" + else: + options = _replace_options(user) + assert [ + mock.call(set(keys), user), + mock.call(set(keys), "root", options=options), + ] == m_setup_keys.call_args_list + @pytest.mark.parametrize( + "cfg, expected_key_types", + [ + pytest.param({}, KEY_NAMES_NO_DSA, id="default"), + pytest.param( + {"ssh_publish_hostkeys": {"enabled": True}}, + KEY_NAMES_NO_DSA, + id="config_enable", + ), + pytest.param( + {"ssh_publish_hostkeys": {"enabled": False}}, + None, + id="config_disable", + ), + pytest.param( + { + "ssh_publish_hostkeys": { + "enabled": True, + "blacklist": ["dsa", "rsa"], + } + }, + ["ecdsa", "ed25519"], + id="config_blacklist", + ), + pytest.param( + {"ssh_publish_hostkeys": {"enabled": True, "blacklist": []}}, + cc_ssh.GENERATE_KEY_NAMES, + id="empty_blacklist", + ), + ], + ) @mock.patch(MODPATH + "glob.glob") @mock.patch(MODPATH + "ug_util.normalize_users_groups") @mock.patch(MODPATH + "os.path.exists") - def test_handle_publish_hostkeys_config_blacklist( - self, m_path_exists, m_nug, m_glob, m_setup_keys + def test_handle_publish_hostkeys( + self, + m_path_exists, + m_nug, + m_glob, + m_setup_keys, + _publish_hostkey_test_setup, + cfg, + expected_key_types, ): """Test handle with various configs for ssh_publish_hostkeys.""" - self._publish_hostkey_test_setup() + test_hostkeys, test_hostkey_files = _publish_hostkey_test_setup cc_ssh.PUBLISH_HOST_KEYS = True keys = ["key1"] user = "clouduser" @@ -352,63 +271,28 @@ def test_handle_publish_hostkeys_config_blacklist( m_glob.side_effect = iter( [ [], - self.test_hostkey_files, + test_hostkey_files, ] ) # Mock os.path.exits to True to short-circuit the key writing logic m_path_exists.return_value = True m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud(distro="ubuntu", metadata={"public-keys": keys}) + cloud = get_cloud(distro="ubuntu", metadata={"public-keys": keys}) cloud.datasource.publish_host_keys = mock.Mock() - cfg = { - "ssh_publish_hostkeys": { - "enabled": True, - "blacklist": ["dsa", "rsa"], - } - } - expected_call = [ - self.test_hostkeys[key_type] for key_type in ["ecdsa", "ed25519"] - ] - cc_ssh.handle("name", cfg, cloud, LOG, None) - self.assertEqual( - [mock.call(expected_call)], - cloud.datasource.publish_host_keys.call_args_list, - ) - - @mock.patch(MODPATH + "glob.glob") - @mock.patch(MODPATH + "ug_util.normalize_users_groups") - @mock.patch(MODPATH + "os.path.exists") - def test_handle_publish_hostkeys_empty_blacklist( - self, m_path_exists, m_nug, m_glob, m_setup_keys - ): - """Test handle with various configs for ssh_publish_hostkeys.""" - self._publish_hostkey_test_setup() - cc_ssh.PUBLISH_HOST_KEYS = True - keys = ["key1"] - user = "clouduser" - # Return no matching keys for first glob, test keys for second. - m_glob.side_effect = iter( - [ - [], - self.test_hostkey_files, + expected_calls = [] + if expected_key_types is not None: + expected_calls = [ + mock.call( + [ + test_hostkeys[key_type] + for key_type in expected_key_types + ] + ) ] - ) - # Mock os.path.exits to True to short-circuit the key writing logic - m_path_exists.return_value = True - m_nug.return_value = ({user: {"default": user}}, {}) - cloud = self.tmp_cloud(distro="ubuntu", metadata={"public-keys": keys}) - cloud.datasource.publish_host_keys = mock.Mock() - - cfg = {"ssh_publish_hostkeys": {"enabled": True, "blacklist": []}} - expected_call = [ - self.test_hostkeys[key_type] - for key_type in cc_ssh.GENERATE_KEY_NAMES - ] cc_ssh.handle("name", cfg, cloud, LOG, None) - self.assertEqual( - [mock.call(expected_call)], - cloud.datasource.publish_host_keys.call_args_list, + assert ( + expected_calls == cloud.datasource.publish_host_keys.call_args_list ) @mock.patch(MODPATH + "ug_util.normalize_users_groups") @@ -425,7 +309,7 @@ def test_handle_ssh_keys_in_cfg(self, m_write_file, m_nug, m_setup_keys): public_name = "{}_public".format(key_type) cert_name = "{}_certificate".format(key_type) - # Actual key contents don"t have to be realistic + # Actual key contents don't have to be realistic private_value = "{}_PRIVATE_KEY".format(key_type) public_value = "{}_PUBLIC_KEY".format(key_type) cert_value = "{}_CERT_KEY".format(key_type) @@ -465,13 +349,54 @@ def test_handle_ssh_keys_in_cfg(self, m_write_file, m_nug, m_setup_keys): with mock.patch( MODPATH + "ssh_util.parse_ssh_config", return_value=[] ): - cc_ssh.handle( - "name", cfg, self.tmp_cloud(distro="ubuntu"), LOG, None - ) + cc_ssh.handle("name", cfg, get_cloud(distro="ubuntu"), LOG, None) # Check that all expected output has been done. for call_ in expected_calls: - self.assertIn(call_, m_write_file.call_args_list) + assert call_ in m_write_file.call_args_list + + @pytest.mark.parametrize( + "key_type,reason", + [ + ("ecdsa-sk", "unsupported"), + ("ed25519-sk", "unsupported"), + ("public", "unrecognized"), + ], + ) + @mock.patch(MODPATH + "ug_util.normalize_users_groups") + @mock.patch(MODPATH + "util.write_file") + def test_handle_invalid_ssh_keys_are_skipped( + self, + m_write_file, + m_nug, + m_setup_keys, + key_type, + reason, + caplog, + ): + cfg = { + "ssh_keys": { + f"{key_type}_private": f"{key_type}_private", + f"{key_type}_public": f"{key_type}_public", + f"{key_type}_certificate": f"{key_type}_certificate", + }, + "ssh_deletekeys": False, + "ssh_publish_hostkeys": {"enabled": False}, + } + # Run the handler. + m_nug.return_value = ([], {}) + with mock.patch( + MODPATH + "ssh_util.parse_ssh_config", return_value=[] + ): + cc_ssh.handle("name", cfg, get_cloud("ubuntu"), LOG, None) + assert [] == m_write_file.call_args_list + expected_log_msgs = [ + f'Skipping {reason} ssh_keys entry: "{key_type}_private"', + f'Skipping {reason} ssh_keys entry: "{key_type}_public"', + f'Skipping {reason} ssh_keys entry: "{key_type}_certificate"', + ] + for expected_log_msg in expected_log_msgs: + assert caplog.text.count(expected_log_msg) == 1 class TestSshSchema: @@ -491,6 +416,14 @@ class TestSshSchema: {"ssh_keys": {"a_public": "key"}}, "'a_public' does not match any of the regexes", ), + ( + {"ssh_keys": {"ecdsa-sk_public": "key"}}, + "'ecdsa-sk_public' does not match any of the regexes", + ), + ( + {"ssh_keys": {"ed25519-sk_public": "key"}}, + "'ed25519-sk_public' does not match any of the regexes", + ), ( {"ssh_authorized_keys": "ssh-rsa blah"}, "'ssh-rsa blah' is not of type 'array'", From 192795ce5b1c7fc620b39ce2d014192c813ae436 Mon Sep 17 00:00:00 2001 From: Alberto Contreras Date: Fri, 13 May 2022 18:12:28 +0200 Subject: [PATCH 2/2] Simplify tests. --- tests/unittests/config/test_cc_ssh.py | 40 ++++++++++----------------- 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/tests/unittests/config/test_cc_ssh.py b/tests/unittests/config/test_cc_ssh.py index 9d157338290..47c0c7770b6 100644 --- a/tests/unittests/config/test_cc_ssh.py +++ b/tests/unittests/config/test_cc_ssh.py @@ -26,7 +26,7 @@ @pytest.fixture(scope="function") -def _publish_hostkey_test_setup(tmpdir): +def publish_hostkey_test_setup(tmpdir): test_hostkeys = { "dsa": ("ssh-dss", "AAAAB3NzaC1kc3MAAACB"), "ecdsa": ("ecdsa-sha2-nistp256", "AAAAE2VjZ"), @@ -62,45 +62,33 @@ class TestHandleSsh: """Test cc_ssh handling of ssh config.""" @pytest.mark.parametrize( - "args,kwargs", + "keys,user,disable_root_opts", [ # For the given user and root. - pytest.param([["key1"], "clouduser", False], {}, id="with_user"), + pytest.param(["key1"], "clouduser", False, id="with_user"), # For root only. - pytest.param([["key1"], None, False], {}, id="with_no_user"), + pytest.param(["key1"], None, False, id="with_no_user"), # For the given user and disable root ssh. pytest.param( - [ - ["key1"], - "clouduser", - True, - ], - {}, + ["key1"], + "clouduser", + True, id="with_user_disable_root", ), # No user and disable root ssh. pytest.param( - [ - ["key1"], - None, - True, - ], - {}, + ["key1"], + None, + True, id="with_no_user_disable_root", ), ], ) def test_apply_credentials( - self, - m_setup_keys, - args, - kwargs, + self, m_setup_keys, keys, user, disable_root_opts ): - keys, user, *args, disable_root_opts = args options = ssh_util.DISABLE_USER_OPTS - cc_ssh.apply_credentials( - keys, user, *args, disable_root_opts, options, **kwargs - ) + cc_ssh.apply_credentials(keys, user, disable_root_opts, options) if not disable_root_opts: expected_options = "" else: @@ -258,12 +246,12 @@ def test_handle_publish_hostkeys( m_nug, m_glob, m_setup_keys, - _publish_hostkey_test_setup, + publish_hostkey_test_setup, cfg, expected_key_types, ): """Test handle with various configs for ssh_publish_hostkeys.""" - test_hostkeys, test_hostkey_files = _publish_hostkey_test_setup + test_hostkeys, test_hostkey_files = publish_hostkey_test_setup cc_ssh.PUBLISH_HOST_KEYS = True keys = ["key1"] user = "clouduser"