Skip to content
8 changes: 6 additions & 2 deletions cloudinit/config/cc_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,13 @@ def handle(name: str, cfg: Config, cloud: Cloud, args: list) -> None:
gid = util.get_group_id("ssh_keys")
if gid != -1:
# perform same "sanitize permissions" as sshd-keygen
permissions_private = 0o600
ssh_version = ssh_util.get_opensshd_upstream_version()
if ssh_version and ssh_version < util.Version(9, 0):
permissions_private = 0o640
os.chown(keyfile, -1, gid)
os.chmod(keyfile, 0o640)
os.chmod(keyfile + ".pub", 0o644)
os.chmod(keyfile, permissions_private)
os.chmod(f"{keyfile}.pub", 0o644)
except subp.ProcessExecutionError as e:
err = util.decode_binary(e.stderr).lower()
if e.exit_code == 1 and err.lower().startswith(
Expand Down
48 changes: 47 additions & 1 deletion cloudinit/ssh_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@

import os
import pwd
from contextlib import suppress
from typing import List, Sequence, Tuple

from cloudinit import log as logging
from cloudinit import util
from cloudinit import subp, util

LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -642,4 +643,49 @@ def append_ssh_config(lines: Sequence[Tuple[str, str]], fname=DEF_SSHD_CFG):
)


def get_opensshd_version():
"""Get the full version of the OpenSSH sshd daemon on the system.

On an ubuntu system, this would look something like:
1.2p1 Ubuntu-1ubuntu0.1

If we can't find `sshd` or parse the version number, return None.
"""
# -V isn't actually a valid argument, but it will cause sshd to print
# out its version number to stderr.
err = ""
with suppress(subp.ProcessExecutionError):
_, err = subp.subp(["sshd", "-V"], rcs=[0, 1])
prefix = "OpenSSH_"
for line in err.split("\n"):
if line.startswith(prefix):
return line[len(prefix) : line.find(",")]
return None


def get_opensshd_upstream_version():
"""Get the upstream version of the OpenSSH sshd dameon on the system.

This will NOT include the portable number, so if the Ubuntu version looks
like `1.2p1 Ubuntu-1ubuntu0.1`, then this function would return
`1.2`
"""
# The default version of openssh is not less than 9.0
upstream_version = "9.0"
full_version = get_opensshd_version()
if full_version is None:
return util.Version.from_str(upstream_version)
if "p" in full_version:
upstream_version = full_version[: full_version.find("p")]
elif " " in full_version:
upstream_version = full_version[: full_version.find(" ")]
else:
upstream_version = full_version
try:
upstream_version = util.Version.from_str(upstream_version)
return upstream_version
except (ValueError, TypeError):
LOG.warning("Could not parse sshd version: %s", upstream_version)


# vi: ts=4 expandtab
46 changes: 45 additions & 1 deletion tests/unittests/config/test_cc_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import pytest

from cloudinit import ssh_util
from cloudinit import ssh_util, util
from cloudinit.config import cc_ssh
from cloudinit.config.schema import (
SchemaValidationError,
Expand Down Expand Up @@ -284,6 +284,50 @@ def test_handle_publish_hostkeys(
expected_calls == cloud.datasource.publish_host_keys.call_args_list
)

@pytest.mark.parametrize(
"ssh_keys_group_exists,sshd_version,expected_private_permissions",
[(False, 0, 0), (True, 8, 0o640), (True, 10, 0o600)],
)
@mock.patch(MODPATH + "subp.subp", return_value=("", ""))
@mock.patch(MODPATH + "util.get_group_id", return_value=10)
@mock.patch(MODPATH + "ssh_util.get_opensshd_upstream_version")
@mock.patch(MODPATH + "os.path.exists", return_value=False)
@mock.patch(MODPATH + "os.chown")
@mock.patch(MODPATH + "os.chmod")
def test_ssh_hostkey_permissions(
self,
m_chmod,
m_chown,
m_exists,
m_sshd_version,
m_gid,
m_subp,
m_setup_keys,
ssh_keys_group_exists,
sshd_version,
expected_private_permissions,
):
"""Test our generated hostkeys use same perms as sshd-keygen.

SSHD version < 9.0 should apply 640 permissions to the private key.
Otherwise, 600.
"""
m_gid.return_value = 10 if ssh_keys_group_exists else -1
m_sshd_version.return_value = util.Version(sshd_version, 0)
key_path = cc_ssh.KEY_FILE_TPL % "rsa"
cloud = get_cloud(distro="ubuntu")
cc_ssh.handle("name", {"ssh_genkeytypes": ["rsa"]}, cloud, [])
if ssh_keys_group_exists:
m_chown.assert_called_once_with(key_path, -1, 10)
assert m_chmod.call_args_list == [
mock.call(key_path, expected_private_permissions),
mock.call(f"{key_path}.pub", 0o644),
]
else:
m_sshd_version.assert_not_called()
m_chown.assert_not_called()
m_chmod.assert_not_called()

@pytest.mark.parametrize("with_sshd_dconf", [False, True])
@mock.patch(MODPATH + "util.ensure_dir")
@mock.patch(MODPATH + "ug_util.normalize_users_groups")
Expand Down