Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cloudinit/config/cc_set_passwords.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ def handle_ssh_pwauth(pw_auth, distro: Distro):
if e.exit_code == 3:
# Service is not running. Write ssh config.
LOG.warning(
"Writing config 'ssh_pwauth: %s'."
" SSH service '%s' will not be restarted because is stopped.",
"Writing config 'ssh_pwauth: %s'. SSH service '%s'"
" will not be restarted because it is stopped.",
pw_auth,
service,
)
Expand Down
301 changes: 155 additions & 146 deletions tests/unittests/config/test_cc_set_passwords.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# This file is part of cloud-init. See LICENSE file for license information.

import logging
from unittest import mock

import pytest
Expand All @@ -12,177 +13,183 @@
validate_cloudconfig_schema,
)
from tests.unittests.helpers import CiTestCase, skipUnlessJsonSchema
from tests.unittests.util import get_cloud

MODPATH = "cloudinit.config.cc_set_passwords."


class TestHandleSshPwauth(CiTestCase):
"""Test cc_set_passwords handling of ssh_pwauth in handle_ssh_pwauth."""
@pytest.fixture()
def mock_uses_systemd(mocker):
mocker.patch("cloudinit.distros.uses_systemd", return_value=True)

with_logs = True

class TestHandleSSHPwauth:
@pytest.mark.parametrize(
"uses_systemd,cmd",
(
(True, ["systemctl", "status", "ssh"]),
(False, ["service", "ssh", "status"]),
),
)
@mock.patch("cloudinit.distros.subp.subp")
def test_unknown_value_logs_warning(self, m_subp):
cloud = self.tmp_cloud(distro="ubuntu")
setpass.handle_ssh_pwauth("floo", cloud.distro)
self.assertIn(
"Unrecognized value: ssh_pwauth=floo", self.logs.getvalue()
)
self.assertEqual(
[mock.call(["systemctl", "status", "ssh"], capture=True)],
m_subp.call_args_list,
)

@mock.patch(MODPATH + "update_ssh_config", return_value=True)
@mock.patch("cloudinit.distros.subp.subp")
def test_systemctl_as_service_cmd(self, m_subp, m_update_ssh_config):
"""If systemctl in service cmd: systemctl restart name."""
cloud = self.tmp_cloud(distro="ubuntu")
cloud.distro.init_cmd = ["systemctl"]
setpass.handle_ssh_pwauth(True, cloud.distro)
m_subp.assert_called_with(
["systemctl", "restart", "ssh"], capture=True
)
self.assertIn("DEBUG: Restarted the SSH daemon.", self.logs.getvalue())
def test_unknown_value_logs_warning(
self, m_subp, uses_systemd, cmd, caplog
):
cloud = get_cloud("ubuntu")
with mock.patch.object(
cloud.distro, "uses_systemd", return_value=uses_systemd
):
setpass.handle_ssh_pwauth("floo", cloud.distro)
assert "Unrecognized value: ssh_pwauth=floo" in caplog.text
assert [mock.call(cmd, capture=True)] == m_subp.call_args_list

@mock.patch(MODPATH + "update_ssh_config", return_value=False)
@pytest.mark.parametrize(
"uses_systemd,ssh_updated,cmd,expected_log",
(
(
True,
True,
["systemctl", "restart", "ssh"],
"Restarted the SSH daemon.",
),
(
True,
False,
["systemctl", "status", "ssh"],
"No need to restart SSH",
),
(
False,
True,
["service", "ssh", "restart"],
"Restarted the SSH daemon.",
),
(
False,
False,
["service", "ssh", "status"],
"No need to restart SSH",
),
),
)
@mock.patch(MODPATH + "update_ssh_config")
@mock.patch("cloudinit.distros.subp.subp")
def test_not_restarted_if_not_updated(self, m_subp, m_update_ssh_config):
"""If config is not updated, then no system restart should be done."""
cloud = self.tmp_cloud(distro="ubuntu")
setpass.handle_ssh_pwauth(True, cloud.distro)
self.assertEqual(
[mock.call(["systemctl", "status", "ssh"], capture=True)],
m_subp.call_args_list,
def test_restart_ssh_only_when_changes_made_and_ssh_installed(
self,
m_subp,
update_ssh_config,
uses_systemd,
ssh_updated,
cmd,
expected_log,
caplog,
):
update_ssh_config.return_value = ssh_updated
cloud = get_cloud("ubuntu")
with mock.patch.object(
cloud.distro, "uses_systemd", return_value=uses_systemd
):
setpass.handle_ssh_pwauth(True, cloud.distro)
if ssh_updated:
m_subp.assert_called_with(cmd, capture=True)
else:
assert [mock.call(cmd, capture=True)] == m_subp.call_args_list
assert expected_log in "\n".join(
r.msg for r in caplog.records if r.levelname == "DEBUG"
)
self.assertIn("No need to restart SSH", self.logs.getvalue())

@mock.patch(MODPATH + "update_ssh_config", return_value=True)
@mock.patch("cloudinit.distros.subp.subp")
def test_unchanged_does_nothing(self, m_subp, m_update_ssh_config):
def test_unchanged_value_does_nothing(
self, m_subp, update_ssh_config, mock_uses_systemd
):
"""If 'unchanged', then no updates to config and no restart."""
cloud = self.tmp_cloud(distro="ubuntu")
update_ssh_config.assert_not_called()
cloud = get_cloud("ubuntu")
setpass.handle_ssh_pwauth("unchanged", cloud.distro)
m_update_ssh_config.assert_not_called()
self.assertEqual(m_update_ssh_config.call_count, 0)
self.assertEqual(
[mock.call(["systemctl", "status", "ssh"], capture=True)],
m_subp.call_args_list,
)
assert [
mock.call(["systemctl", "status", "ssh"], capture=True)
] == m_subp.call_args_list

@pytest.mark.allow_subp_for("systemctl")
@mock.patch("cloudinit.distros.subp.subp")
def test_valid_change_values(self, m_subp):
"""If value is a valid changen value, then update should be called."""
cloud = self.tmp_cloud(distro="ubuntu")
def test_valid_value_changes_updates_ssh(self, m_subp, mock_uses_systemd):
"""If value is a valid changed value, then update will be called."""
cloud = get_cloud("ubuntu")
upname = MODPATH + "update_ssh_config"
optname = "PasswordAuthentication"
for n, value in enumerate(util.FALSE_STRINGS + util.TRUE_STRINGS, 1):
optval = "yes" if value in util.TRUE_STRINGS else "no"
with mock.patch(upname, return_value=False) as m_update:
setpass.handle_ssh_pwauth(value, cloud.distro)
self.assertEqual(
mock.call({optname: optval}), m_update.call_args_list[-1]
assert (
mock.call({optname: optval}) == m_update.call_args_list[-1]
)
self.assertEqual(m_subp.call_count, n)
self.assertEqual(
mock.call(["systemctl", "status", "ssh"], capture=True),
m_subp.call_args_list[-1],
)

@mock.patch(MODPATH + "update_ssh_config", return_value=True)
@mock.patch("cloudinit.distros.subp.subp")
def test_failed_ssh_service_is_not_runing(
self, m_subp, m_update_ssh_config
):
"""If the ssh service is not running, then the config is updated and
no restart.
"""
cloud = self.tmp_cloud(distro="ubuntu")
cloud.distro.init_cmd = ["systemctl"]
cloud.distro.manage_service = mock.Mock(
side_effect=subp.ProcessExecutionError(
stderr="Service is not running.", exit_code=3
)
)

setpass.handle_ssh_pwauth(True, cloud.distro)
self.assertIn(
r"WARNING: Writing config 'ssh_pwauth: True'."
r" SSH service 'ssh' will not be restarted because is stopped.",
self.logs.getvalue(),
)
self.assertIn(
r"DEBUG: Not restarting SSH service: service is stopped.",
self.logs.getvalue(),
)
self.assertEqual(
[mock.call("status", "ssh")],
cloud.distro.manage_service.call_args_list,
)
self.assertEqual(m_update_ssh_config.call_count, 1)
self.assertEqual(m_subp.call_count, 0)

@mock.patch(MODPATH + "update_ssh_config", return_value=True)
@mock.patch("cloudinit.distros.subp.subp")
def test_failed_ssh_service_is_not_installed(
self, m_subp, m_update_ssh_config
):
"""If the ssh service is not installed, then no updates config and
no restart.
"""
cloud = self.tmp_cloud(distro="ubuntu")
cloud.distro.init_cmd = ["systemctl"]
cloud.distro.manage_service = mock.Mock(
side_effect=subp.ProcessExecutionError(
stderr="Service is not installed.", exit_code=4
)
)

setpass.handle_ssh_pwauth(True, cloud.distro)
self.assertIn(
r"WARNING: Ignoring config 'ssh_pwauth: True'."
r" SSH service 'ssh' is not installed.",
self.logs.getvalue(),
)
self.assertEqual(
[mock.call("status", "ssh")],
cloud.distro.manage_service.call_args_list,
)
self.assertEqual(m_update_ssh_config.call_count, 0)
self.assertEqual(m_subp.call_count, 0)
assert m_subp.call_count == n

@pytest.mark.parametrize(
"raised_error,warning_log,debug_log,update_ssh_call_count",
(
(
subp.ProcessExecutionError(
stderr="Service is not running.", exit_code=3
),
"Writing config 'ssh_pwauth: True'. SSH service"
" 'ssh' will not be restarted because it is stopped.",
"Not restarting SSH service: service is stopped.",
1,
),
(
subp.ProcessExecutionError(
stderr="Service is not installed.", exit_code=4
),
"Ignoring config 'ssh_pwauth: True'. SSH service 'ssh' is"
" not installed.",
None,
0,
),
(
subp.ProcessExecutionError(
stderr="Service is not available.", exit_code=2
),
"Ignoring config 'ssh_pwauth: True'. SSH service 'ssh'"
" is not available. Error: ",
None,
0,
),
),
)
@mock.patch(MODPATH + "update_ssh_config", return_value=True)
@mock.patch("cloudinit.distros.subp.subp")
def test_failed_ssh_service_is_not_available(
self, m_subp, m_update_ssh_config
def test_no_restart_when_service_is_not_running(
self,
m_subp,
m_update_ssh_config,
raised_error,
warning_log,
debug_log,
update_ssh_call_count,
caplog,
):
"""If the ssh service is not available, then no updates config and
no restart.
"""
cloud = self.tmp_cloud(distro="ubuntu")
cloud.distro.init_cmd = ["systemctl"]
process_error = "Service is not available."
cloud.distro.manage_service = mock.Mock(
side_effect=subp.ProcessExecutionError(
stderr=process_error, exit_code=2
)
)
"""Write config but don't restart SSH service when not running."""
cloud = get_cloud("ubuntu")
cloud.distro.manage_service = mock.Mock(side_effect=raised_error)

setpass.handle_ssh_pwauth(True, cloud.distro)
self.assertIn(
r"WARNING: Ignoring config 'ssh_pwauth: True'."
r" SSH service 'ssh' is not available. Error: ",
self.logs.getvalue(),
)
self.assertIn(process_error, self.logs.getvalue())
self.assertEqual(
[mock.call("status", "ssh")],
cloud.distro.manage_service.call_args_list,
)
self.assertEqual(m_update_ssh_config.call_count, 0)
self.assertEqual(m_subp.call_count, 0)
logs_by_level = {logging.WARNING: [], logging.DEBUG: []}
for _, level, msg in caplog.record_tuples:
logs_by_level[level].append(msg)
assert warning_log in "\n".join(logs_by_level[logging.WARNING])
if debug_log:
assert debug_log in logs_by_level[logging.DEBUG]
assert [
mock.call("status", "ssh")
] == cloud.distro.manage_service.call_args_list
assert m_update_ssh_config.call_count == update_ssh_call_count
m_subp.assert_not_called()


@pytest.mark.usefixtures("mock_uses_systemd")
class TestSetPasswordsHandle(CiTestCase):
"""Test cc_set_passwords.handle"""

Expand Down Expand Up @@ -231,19 +238,21 @@ def test_handle_on_chpasswd_list_parses_common_hashes(self, m_subp):
called = chpasswd.call_args[0][1]
self.assertEqual(valid, called)

@mock.patch(MODPATH + "util.is_BSD")
@mock.patch(MODPATH + "util.is_BSD", return_value=True)
@mock.patch(MODPATH + "subp.subp")
def test_bsd_calls_custom_pw_cmds_to_set_and_expire_passwords(
self, m_subp, m_is_bsd
):
"""BSD don't use chpasswd"""
m_is_bsd.return_value = True
cloud = self.tmp_cloud(distro="freebsd")
cloud = get_cloud(distro="freebsd")
valid_pwds = ["ubuntu:passw0rd"]
cfg = {"chpasswd": {"list": valid_pwds}}
setpass.handle(
"IGNORED", cfg=cfg, cloud=cloud, log=self.logger, args=[]
)
with mock.patch.object(
cloud.distro, "uses_systemd", return_value=False
):
setpass.handle(
"IGNORED", cfg=cfg, cloud=cloud, log=self.logger, args=[]
)
self.assertEqual(
[
mock.call(
Expand All @@ -252,7 +261,7 @@ def test_bsd_calls_custom_pw_cmds_to_set_and_expire_passwords(
logstring="chpasswd for ubuntu",
),
mock.call(["pw", "usermod", "ubuntu", "-p", "01-Jan-1970"]),
mock.call(["systemctl", "status", "sshd"], capture=True),
mock.call(["service", "sshd", "status"], capture=True),
],
m_subp.call_args_list,
)
Expand Down