From c7a3220b5d1cd051dc0d804a53e6df43ebc010e6 Mon Sep 17 00:00:00 2001 From: Alberto Contreras Date: Sat, 7 May 2022 16:08:12 +0200 Subject: [PATCH 1/8] Migrate test_status.py to pytest. --- tests/unittests/cmd/test_status.py | 757 ++++++++++++++--------------- 1 file changed, 371 insertions(+), 386 deletions(-) diff --git a/tests/unittests/cmd/test_status.py b/tests/unittests/cmd/test_status.py index c5f424da812..d3d2db55628 100644 --- a/tests/unittests/cmd/test_status.py +++ b/tests/unittests/cmd/test_status.py @@ -4,169 +4,182 @@ from collections import namedtuple from io import StringIO from textwrap import dedent +from typing import Callable, Dict, Optional, Union +from unittest import mock + +import pytest from cloudinit.atomic_helper import write_json from cloudinit.cmd import status from cloudinit.util import ensure_file -from tests.unittests.helpers import CiTestCase, mock, wrap_and_call - -mypaths = namedtuple("MyPaths", "run_dir") -myargs = namedtuple("MyArgs", "long wait") - - -class TestStatus(CiTestCase): - def setUp(self): - super(TestStatus, self).setUp() - self.new_root = self.tmp_dir() - self.status_file = self.tmp_path("status.json", self.new_root) - self.disable_file = self.tmp_path("cloudinit-disable", self.new_root) - self.paths = mypaths(run_dir=self.new_root) - - class FakeInit(object): - paths = self.paths - - def __init__(self, ds_deps): - pass - - def read_cfg(self): - pass - - self.init_class = FakeInit - - def test__is_cloudinit_disabled_false_on_sysvinit(self): - """When not in an environment using systemd, return False.""" - ensure_file(self.disable_file) # Create the ignored disable file +from tests.unittests.helpers import wrap_and_call + +MyPaths = namedtuple("MyPaths", "run_dir") +MyArgs = namedtuple("MyArgs", "long wait") +Config = namedtuple( + "Config", "new_root, status_file, disable_file, result_file, paths" +) + + +@pytest.fixture(scope="function") +def config(tmpdir): + return Config( + new_root=tmpdir, + status_file=tmpdir.join("status.json"), + disable_file=tmpdir.join("cloudinit-disable"), + result_file=tmpdir.join("result.json"), + paths=MyPaths(run_dir=tmpdir), + ) + + +@pytest.fixture(scope="function") +def init_class(config): + class FakeInit(object): + paths = config.paths + + def __init__(self, ds_deps): + pass + + def read_cfg(self): + pass + + return FakeInit + + +class TestStatus: + @pytest.mark.parametrize( + [ + "ensured_file", + "uses_systemd", + "get_cmdline", + "expected_is_disabled", + "is_disabled_msg", + "expected_reason", + ], + [ + # When not in an environment using systemd, return False. + pytest.param( + lambda config: config.disable_file, + False, + "root=/dev/my-root not-important", + False, + "expected enabled cloud-init on sysvinit", + "Cloud-init enabled on sysvinit", + id="false_on_sysvinit", + ), + # When using systemd and disable_file is present return disabled. + pytest.param( + lambda config: config.disable_file, + True, + "root=/dev/my-root not-important", + True, + "expected disabled cloud-init", + lambda config: f"Cloud-init disabled by {config.disable_file}", + id="true_on_disable_file", + ), + # Not disabled when using systemd and enabled via commandline. + pytest.param( + lambda config: config.disable_file, + True, + "something cloud-init=enabled else", + False, + "expected enabled cloud-init", + "Cloud-init enabled by kernel command line cloud-init=enabled", + id="false_on_kernel_cmdline_enable", + ), + # When kernel command line disables cloud-init return True. + pytest.param( + None, + True, + "something cloud-init=disabled else", + True, + "expected disabled cloud-init", + "Cloud-init disabled by kernel parameter cloud-init=disabled", + id="true_on_kernel_cmdline", + ), + # When cloud-init-generator writes disabled file return True. + pytest.param( + lambda config: os.path.join(config.paths.run_dir, "disabled"), + True, + "something", + True, + "expected disabled cloud-init", + "Cloud-init disabled by cloud-init-generator", + id="true_when_generator_disables", + ), + # Report enabled when systemd generator creates the enabled file. + pytest.param( + lambda config: os.path.join(config.paths.run_dir, "enabled"), + True, + "something ignored", + False, + "expected enabled cloud-init", + "Cloud-init enabled by systemd cloud-init-generator", + id="false_when_enabled_in_systemd", + ), + ], + ) + def test__is_cloudinit_disabled( + self, + ensured_file: Optional[Callable], + uses_systemd: bool, + get_cmdline: str, + expected_is_disabled: bool, + is_disabled_msg: str, + expected_reason: Union[str, Callable], + config: Config, + ): + if ensured_file is not None: + ensure_file(ensured_file(config)) (is_disabled, reason) = wrap_and_call( "cloudinit.cmd.status", { - "uses_systemd": False, - "get_cmdline": "root=/dev/my-root not-important", + "uses_systemd": uses_systemd, + "get_cmdline": get_cmdline, }, status._is_cloudinit_disabled, - self.disable_file, - self.paths, - ) - self.assertFalse( - is_disabled, "expected enabled cloud-init on sysvinit" - ) - self.assertEqual("Cloud-init enabled on sysvinit", reason) - - def test__is_cloudinit_disabled_true_on_disable_file(self): - """When using systemd and disable_file is present return disabled.""" - ensure_file(self.disable_file) # Create observed disable file - (is_disabled, reason) = wrap_and_call( - "cloudinit.cmd.status", - { - "uses_systemd": True, - "get_cmdline": "root=/dev/my-root not-important", - }, - status._is_cloudinit_disabled, - self.disable_file, - self.paths, - ) - self.assertTrue(is_disabled, "expected disabled cloud-init") - self.assertEqual( - "Cloud-init disabled by {0}".format(self.disable_file), reason - ) - - def test__is_cloudinit_disabled_false_on_kernel_cmdline_enable(self): - """Not disabled when using systemd and enabled via commandline.""" - ensure_file(self.disable_file) # Create ignored disable file - (is_disabled, reason) = wrap_and_call( - "cloudinit.cmd.status", - { - "uses_systemd": True, - "get_cmdline": "something cloud-init=enabled else", - }, - status._is_cloudinit_disabled, - self.disable_file, - self.paths, - ) - self.assertFalse(is_disabled, "expected enabled cloud-init") - self.assertEqual( - "Cloud-init enabled by kernel command line cloud-init=enabled", - reason, - ) - - def test__is_cloudinit_disabled_true_on_kernel_cmdline(self): - """When kernel command line disables cloud-init return True.""" - (is_disabled, reason) = wrap_and_call( - "cloudinit.cmd.status", - { - "uses_systemd": True, - "get_cmdline": "something cloud-init=disabled else", - }, - status._is_cloudinit_disabled, - self.disable_file, - self.paths, - ) - self.assertTrue(is_disabled, "expected disabled cloud-init") - self.assertEqual( - "Cloud-init disabled by kernel parameter cloud-init=disabled", - reason, - ) - - def test__is_cloudinit_disabled_true_when_generator_disables(self): - """When cloud-init-generator writes disabled file return True.""" - disabled_file = os.path.join(self.paths.run_dir, "disabled") - ensure_file(disabled_file) - (is_disabled, reason) = wrap_and_call( - "cloudinit.cmd.status", - {"uses_systemd": True, "get_cmdline": "something"}, - status._is_cloudinit_disabled, - self.disable_file, - self.paths, - ) - self.assertTrue(is_disabled, "expected disabled cloud-init") - self.assertEqual("Cloud-init disabled by cloud-init-generator", reason) - - def test__is_cloudinit_disabled_false_when_enabled_in_systemd(self): - """Report enabled when systemd generator creates the enabled file.""" - enabled_file = os.path.join(self.paths.run_dir, "enabled") - ensure_file(enabled_file) - (is_disabled, reason) = wrap_and_call( - "cloudinit.cmd.status", - {"uses_systemd": True, "get_cmdline": "something ignored"}, - status._is_cloudinit_disabled, - self.disable_file, - self.paths, - ) - self.assertFalse(is_disabled, "expected enabled cloud-init") - self.assertEqual( - "Cloud-init enabled by systemd cloud-init-generator", reason + config.disable_file, + config.paths, ) + assert is_disabled == expected_is_disabled, is_disabled_msg + if isinstance(expected_reason, str): + assert reason == expected_reason + else: + assert reason == expected_reason(config) - def test_status_returns_not_run(self): + def test_status_returns_not_run(self, config: Config, init_class): """When status.json does not exist yet, return 'not run'.""" - self.assertFalse( - os.path.exists(self.status_file), "Unexpected status.json found" - ) - cmdargs = myargs(long=False, wait=False) + assert not os.path.exists( + config.status_file + ), "Unexpected status.json found" + cmdargs = MyArgs(long=False, wait=False) with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( "cloudinit.cmd.status", { "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, + "Init": {"side_effect": init_class}, }, status.handle_status_args, "ignored", cmdargs, ) - self.assertEqual(0, retcode) - self.assertEqual("status: not run\n", m_stdout.getvalue()) + assert retcode == 0 + assert m_stdout.getvalue() == "status: not run\n" - def test_status_returns_disabled_long_on_presence_of_disable_file(self): + def test_status_returns_disabled_long_on_presence_of_disable_file( + self, config: Config, init_class + ): """When cloudinit is disabled, return disabled reason.""" checked_files = [] def fakeexists(filepath): checked_files.append(filepath) - status_file = os.path.join(self.paths.run_dir, "status.json") + status_file = os.path.join(config.paths.run_dir, "status.json") return bool(not filepath == status_file) - cmdargs = myargs(long=True, wait=False) + cmdargs = MyArgs(long=True, wait=False) with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( "cloudinit.cmd.status", @@ -176,16 +189,16 @@ def fakeexists(filepath): True, "disabled for some reason", ), - "Init": {"side_effect": self.init_class}, + "Init": {"side_effect": init_class}, }, status.handle_status_args, "ignored", cmdargs, ) - self.assertEqual(0, retcode) - self.assertEqual( - [os.path.join(self.paths.run_dir, "status.json")], checked_files - ) + assert retcode == 0 + assert checked_files == [ + os.path.join(config.paths.run_dir, "status.json") + ] expected = dedent( """\ status: disabled @@ -193,245 +206,214 @@ def fakeexists(filepath): disabled for some reason """ ) - self.assertEqual(expected, m_stdout.getvalue()) - - def test_status_returns_running_on_no_results_json(self): - """Report running when status.json exists but result.json does not.""" - result_file = self.tmp_path("result.json", self.new_root) - write_json(self.status_file, {}) - self.assertFalse( - os.path.exists(result_file), "Unexpected result.json found" - ) - cmdargs = myargs(long=False, wait=False) - with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - "cloudinit.cmd.status", + assert m_stdout.getvalue() == expected + + @pytest.mark.parametrize( + [ + "ensured_file", + "status_content", + "assert_file", + "cmdargs", + "expected_retcode", + "expected_status", + ], + [ + # Report running when status.json exists but result.json does not. + pytest.param( + None, + {}, + lambda config: config.result_file, + MyArgs(long=False, wait=False), + 0, + "status: running\n", + id="running_on_no_results_json", + ), + # Report running when status exists with an unfinished stage. + pytest.param( + lambda config: config.result_file, + {"v1": {"init": {"start": 1, "finished": None}}}, + None, + MyArgs(long=False, wait=False), + 0, + "status: running\n", + id="running", + ), + # Report done results.json exists no stages are unfinished. + pytest.param( + lambda config: config.result_file, { - "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, + "v1": { + "stage": None, # No current stage running + "datasource": ( + "DataSourceNoCloud " + "[seed=/var/.../seed/nocloud-net]" + "[dsmode=net]" + ), + "blah": {"finished": 123.456}, + "init": { + "errors": [], + "start": 124.567, + "finished": 125.678, + }, + "init-local": {"start": 123.45, "finished": 123.46}, + } }, - status.handle_status_args, - "ignored", - cmdargs, - ) - self.assertEqual(0, retcode) - self.assertEqual("status: running\n", m_stdout.getvalue()) - - def test_status_returns_running(self): - """Report running when status exists with an unfinished stage.""" - ensure_file(self.tmp_path("result.json", self.new_root)) - write_json( - self.status_file, {"v1": {"init": {"start": 1, "finished": None}}} - ) - cmdargs = myargs(long=False, wait=False) - with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - "cloudinit.cmd.status", - { - "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, - }, - status.handle_status_args, - "ignored", - cmdargs, - ) - self.assertEqual(0, retcode) - self.assertEqual("status: running\n", m_stdout.getvalue()) - - def test_status_returns_done(self): - """Report done results.json exists no stages are unfinished.""" - ensure_file(self.tmp_path("result.json", self.new_root)) - write_json( - self.status_file, - { - "v1": { - "stage": None, # No current stage running - "datasource": ( - "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]" - "[dsmode=net]" - ), - "blah": {"finished": 123.456}, - "init": { - "errors": [], - "start": 124.567, - "finished": 125.678, - }, - "init-local": {"start": 123.45, "finished": 123.46}, - } - }, - ) - cmdargs = myargs(long=False, wait=False) - with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - "cloudinit.cmd.status", + None, + MyArgs(long=False, wait=False), + 0, + "status: done\n", + id="done", + ), + # Long format of done status includes datasource info. + pytest.param( + lambda config: config.result_file, { - "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, + "v1": { + "stage": None, + "datasource": ( + "DataSourceNoCloud " + "[seed=/var/.../seed/nocloud-net]" + "[dsmode=net]" + ), + "init": {"start": 124.567, "finished": 125.678}, + "init-local": {"start": 123.45, "finished": 123.46}, + } }, - status.handle_status_args, - "ignored", - cmdargs, - ) - self.assertEqual(0, retcode) - self.assertEqual("status: done\n", m_stdout.getvalue()) - - def test_status_returns_done_long(self): - """Long format of done status includes datasource info.""" - ensure_file(self.tmp_path("result.json", self.new_root)) - write_json( - self.status_file, - { - "v1": { - "stage": None, - "datasource": ( - "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]" - "[dsmode=net]" - ), - "init": {"start": 124.567, "finished": 125.678}, - "init-local": {"start": 123.45, "finished": 123.46}, - } - }, - ) - cmdargs = myargs(long=True, wait=False) - with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - "cloudinit.cmd.status", + None, + MyArgs(long=True, wait=False), + 0, + dedent( + """\ + status: done + time: Thu, 01 Jan 1970 00:02:05 +0000 + detail: + DataSourceNoCloud [seed=/var/.../seed/nocloud-net]\ +[dsmode=net] + """ + ), + id="returns_done_long", + ), + # Reports error when any stage has errors. + pytest.param( + None, { - "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, + "v1": { + "stage": None, + "blah": {"errors": [], "finished": 123.456}, + "init": { + "errors": ["error1"], + "start": 124.567, + "finished": 125.678, + }, + "init-local": {"start": 123.45, "finished": 123.46}, + } }, - status.handle_status_args, - "ignored", - cmdargs, - ) - self.assertEqual(0, retcode) - expected = dedent( - """\ - status: done - time: Thu, 01 Jan 1970 00:02:05 +0000 - detail: - DataSourceNoCloud [seed=/var/.../seed/nocloud-net][dsmode=net] - """ - ) - self.assertEqual(expected, m_stdout.getvalue()) - - def test_status_on_errors(self): - """Reports error when any stage has errors.""" - write_json( - self.status_file, - { - "v1": { - "stage": None, - "blah": {"errors": [], "finished": 123.456}, - "init": { - "errors": ["error1"], - "start": 124.567, - "finished": 125.678, - }, - "init-local": {"start": 123.45, "finished": 123.46}, - } - }, - ) - cmdargs = myargs(long=False, wait=False) - with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - "cloudinit.cmd.status", + None, + MyArgs(long=False, wait=False), + 1, + "status: error\n", + id="on_errors", + ), + # Long format of error status includes all error messages. + pytest.param( + None, { - "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, + "v1": { + "stage": None, + "datasource": ( + "DataSourceNoCloud " + "[seed=/var/.../seed/nocloud-net]" + "[dsmode=net]" + ), + "init": { + "errors": ["error1"], + "start": 124.567, + "finished": 125.678, + }, + "init-local": { + "errors": ["error2", "error3"], + "start": 123.45, + "finished": 123.46, + }, + } }, - status.handle_status_args, - "ignored", - cmdargs, - ) - self.assertEqual(1, retcode) - self.assertEqual("status: error\n", m_stdout.getvalue()) - - def test_status_on_errors_long(self): - """Long format of error status includes all error messages.""" - write_json( - self.status_file, - { - "v1": { - "stage": None, - "datasource": ( - "DataSourceNoCloud [seed=/var/.../seed/nocloud-net]" - "[dsmode=net]" - ), - "init": { - "errors": ["error1"], - "start": 124.567, - "finished": 125.678, - }, - "init-local": { - "errors": ["error2", "error3"], - "start": 123.45, - "finished": 123.46, - }, - } - }, - ) - cmdargs = myargs(long=True, wait=False) - with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: - retcode = wrap_and_call( - "cloudinit.cmd.status", + None, + MyArgs(long=True, wait=False), + 1, + dedent( + """\ + status: error + time: Thu, 01 Jan 1970 00:02:05 +0000 + detail: + error1 + error2 + error3 + """ + ), + id="on_errors_long", + ), + # Long format reports the stage in which we are running. + pytest.param( + None, { - "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, + "v1": { + "stage": "init", + "init": {"start": 124.456, "finished": None}, + "init-local": {"start": 123.45, "finished": 123.46}, + } }, - status.handle_status_args, - "ignored", - cmdargs, - ) - self.assertEqual(1, retcode) - expected = dedent( - """\ - status: error - time: Thu, 01 Jan 1970 00:02:05 +0000 - detail: - error1 - error2 - error3 - """ - ) - self.assertEqual(expected, m_stdout.getvalue()) - - def test_status_returns_running_long_format(self): - """Long format reports the stage in which we are running.""" + None, + MyArgs(long=True, wait=False), + 0, + dedent( + """\ + status: running + time: Thu, 01 Jan 1970 00:02:04 +0000 + detail: + Running in stage: init + """ + ), + id="running_long_format", + ), + ], + ) + def test_status_output( + self, + ensured_file: Optional[Callable], + status_content: Dict, + assert_file, + cmdargs: MyArgs, + expected_retcode: int, + expected_status: str, + config: Config, + init_class, + ): + """""" + if ensured_file: + ensure_file(ensured_file(config)) write_json( - self.status_file, - { - "v1": { - "stage": "init", - "init": {"start": 124.456, "finished": None}, - "init-local": {"start": 123.45, "finished": 123.46}, - } - }, + config.status_file, + status_content, ) - cmdargs = myargs(long=True, wait=False) + if assert_file: + assert not os.path.exists( + config.result_file + ), f"Unexpected {config.result_file} found" with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( "cloudinit.cmd.status", { "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, + "Init": {"side_effect": init_class}, }, status.handle_status_args, "ignored", cmdargs, ) - self.assertEqual(0, retcode) - expected = dedent( - """\ - status: running - time: Thu, 01 Jan 1970 00:02:04 +0000 - detail: - Running in stage: init - """ - ) - self.assertEqual(expected, m_stdout.getvalue()) + assert retcode == expected_retcode + assert m_stdout.getvalue() == expected_status - def test_status_wait_blocks_until_done(self): + def test_status_wait_blocks_until_done(self, config: Config, init_class): """Specifying wait will poll every 1/4 second until done state.""" running_json = { "v1": { @@ -448,36 +430,37 @@ def test_status_wait_blocks_until_done(self): } } - self.sleep_calls = 0 + sleep_calls = 0 def fake_sleep(interval): - self.assertEqual(0.25, interval) - self.sleep_calls += 1 - if self.sleep_calls == 2: - write_json(self.status_file, running_json) - elif self.sleep_calls == 3: - write_json(self.status_file, done_json) - result_file = self.tmp_path("result.json", self.new_root) + nonlocal sleep_calls + assert interval == 0.25 + sleep_calls += 1 + if sleep_calls == 2: + write_json(config.status_file, running_json) + elif sleep_calls == 3: + write_json(config.status_file, done_json) + result_file = config.result_file ensure_file(result_file) - cmdargs = myargs(long=False, wait=True) + cmdargs = MyArgs(long=False, wait=True) with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( "cloudinit.cmd.status", { "sleep": {"side_effect": fake_sleep}, "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, + "Init": {"side_effect": init_class}, }, status.handle_status_args, "ignored", cmdargs, ) - self.assertEqual(0, retcode) - self.assertEqual(4, self.sleep_calls) - self.assertEqual("....\nstatus: done\n", m_stdout.getvalue()) + assert retcode == 0 + assert sleep_calls == 4 + assert m_stdout.getvalue() == "....\nstatus: done\n" - def test_status_wait_blocks_until_error(self): + def test_status_wait_blocks_until_error(self, config: Config, init_class): """Specifying wait will poll every 1/4 second until error state.""" running_json = { "v1": { @@ -498,51 +481,53 @@ def test_status_wait_blocks_until_error(self): } } - self.sleep_calls = 0 + sleep_calls = 0 def fake_sleep(interval): - self.assertEqual(0.25, interval) - self.sleep_calls += 1 - if self.sleep_calls == 2: - write_json(self.status_file, running_json) - elif self.sleep_calls == 3: - write_json(self.status_file, error_json) - - cmdargs = myargs(long=False, wait=True) + nonlocal sleep_calls + assert interval == 0.25 + sleep_calls += 1 + if sleep_calls == 2: + write_json(config.status_file, running_json) + elif sleep_calls == 3: + write_json(config.status_file, error_json) + + cmdargs = MyArgs(long=False, wait=True) with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( "cloudinit.cmd.status", { "sleep": {"side_effect": fake_sleep}, "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, + "Init": {"side_effect": init_class}, }, status.handle_status_args, "ignored", cmdargs, ) - self.assertEqual(1, retcode) - self.assertEqual(4, self.sleep_calls) - self.assertEqual("....\nstatus: error\n", m_stdout.getvalue()) + assert retcode == 1 + assert sleep_calls == 4 + assert m_stdout.getvalue() == "....\nstatus: error\n" - def test_status_main(self): + def test_status_main(self, config: Config, init_class, capsys): """status.main can be run as a standalone script.""" write_json( - self.status_file, {"v1": {"init": {"start": 1, "finished": None}}} + config.status_file, + {"v1": {"init": {"start": 1, "finished": None}}}, ) - with self.assertRaises(SystemExit) as context_manager: + with pytest.raises(SystemExit) as e: with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: wrap_and_call( "cloudinit.cmd.status", { "sys.argv": {"new": ["status"]}, "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": self.init_class}, + "Init": {"side_effect": init_class}, }, status.main, ) - self.assertEqual(0, context_manager.exception.code) - self.assertEqual("status: running\n", m_stdout.getvalue()) + assert e.value.code == 0 + assert m_stdout.getvalue() == "status: running\n" # vi: ts=4 expandtab syntax=python From 1bf1fe0f54156e8183a462f7a86ce03f02f139c2 Mon Sep 17 00:00:00 2001 From: Alberto Contreras Date: Mon, 9 May 2022 08:39:59 +0200 Subject: [PATCH 2/8] Handle file permission errors. - Unify the way of reading cfg paths in status.py by using `devel.read_cfg_paths`. - Add error handling for `devel.read_cfg_paths`. It outputs an error and sys-exists with 1. - Add tests covering this behavior for query, status and render cmds. - Migrate `test_render.py` to Pytest. --- cloudinit/cmd/devel/__init__.py | 21 ++- cloudinit/cmd/status.py | 19 +-- pyproject.toml | 3 - tests/unittests/cmd/devel/test_render.py | 194 ++++++++++++----------- tests/unittests/cmd/test_query.py | 94 +++++++---- tests/unittests/cmd/test_status.py | 127 ++++++++++----- 6 files changed, 276 insertions(+), 182 deletions(-) diff --git a/cloudinit/cmd/devel/__init__.py b/cloudinit/cmd/devel/__init__.py index ead5f7a9152..a78848ef3a0 100755 --- a/cloudinit/cmd/devel/__init__.py +++ b/cloudinit/cmd/devel/__init__.py @@ -3,10 +3,13 @@ """Common cloud-init devel commandline utility functions.""" +import errno import logging from cloudinit import log +from cloudinit.helpers import Paths from cloudinit.stages import Init +from cloudinit.util import error def addLogHandlerCLI(logger, log_level): @@ -16,10 +19,22 @@ def addLogHandlerCLI(logger, log_level): return logger -def read_cfg_paths(): - """Return a Paths object based on the system configuration on disk.""" +def read_cfg_paths() -> Paths: + """Return a Paths object based on the system configuration on disk. + + It handles file permission errors. + """ init = Init(ds_deps=[]) - init.read_cfg() + try: + init.read_cfg() + except OSError as e: + if e.errno == errno.EACCES: + error( + f"Failed reading config file(s) due to permission error:\n{e}", + rc=1, + sys_exit=True, + ) + raise return init.paths diff --git a/cloudinit/cmd/status.py b/cloudinit/cmd/status.py index f3b4f1616cd..1c7c209be5b 100755 --- a/cloudinit/cmd/status.py +++ b/cloudinit/cmd/status.py @@ -11,9 +11,10 @@ import os import sys from time import gmtime, sleep, strftime +from typing import Tuple +from cloudinit.cmd.devel import read_cfg_paths from cloudinit.distros import uses_systemd -from cloudinit.stages import Init from cloudinit.util import get_cmdline, load_file, load_json CLOUDINIT_DISABLED_FILE = "/etc/cloud/cloud-init.disabled" @@ -64,17 +65,16 @@ def get_parser(parser=None): return parser -def handle_status_args(name, args): +def handle_status_args(name, args) -> int: """Handle calls to 'cloud-init status' as a subcommand.""" # Read configured paths - init = Init(ds_deps=[]) - init.read_cfg() - status, status_detail, time = get_status_details(init.paths) + paths = read_cfg_paths() + status, status_detail, time = get_status_details(paths) if args.wait: while status in (UXAppStatus.NOT_RUN, UXAppStatus.RUNNING): sys.stdout.write(".") sys.stdout.flush() - status, status_detail, time = get_status_details(init.paths) + status, status_detail, time = get_status_details(paths) sleep(0.25) sys.stdout.write("\n") print("status: {0}".format(status.value)) @@ -115,17 +115,14 @@ def _is_cloudinit_disabled(disable_file, paths): return (is_disabled, reason) -def get_status_details(paths=None): +def get_status_details(paths=None) -> Tuple[UXAppStatus, str, str]: """Return a 3-tuple of status, status_details and time of last event. @param paths: An initialized cloudinit.helpers.paths object. Values are obtained from parsing paths.run_dir/status.json. """ - if not paths: - init = Init(ds_deps=[]) - init.read_cfg() - paths = init.paths + paths = paths or read_cfg_paths() status = UXAppStatus.NOT_RUN status_detail = "" diff --git a/pyproject.toml b/pyproject.toml index e1fb7dcaa55..26dfd58ff0f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,12 +56,9 @@ exclude=[ '^tests/integration_tests/modules/test_growpart\.py$', '^tests/integration_tests/modules/test_ssh_keysfile\.py$', '^tests/unittests/__init__\.py$', - '^tests/unittests/cmd/devel/test_render\.py$', '^tests/unittests/cmd/test_clean\.py$', '^tests/unittests/cmd/test_cloud_id\.py$', '^tests/unittests/cmd/test_main\.py$', - '^tests/unittests/cmd/test_query\.py$', - '^tests/unittests/cmd/test_status\.py$', '^tests/unittests/config/test_cc_chef\.py$', '^tests/unittests/config/test_cc_landscape\.py$', '^tests/unittests/config/test_cc_locale\.py$', diff --git a/tests/unittests/cmd/devel/test_render.py b/tests/unittests/cmd/devel/test_render.py index 4afc64f02fc..8eb2e33b221 100644 --- a/tests/unittests/cmd/devel/test_render.py +++ b/tests/unittests/cmd/devel/test_render.py @@ -1,154 +1,164 @@ # This file is part of cloud-init. See LICENSE file for license information. -import os +import errno from collections import namedtuple from io import StringIO +import pytest + from cloudinit.cmd.devel import render from cloudinit.helpers import Paths from cloudinit.sources import INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE from cloudinit.util import ensure_dir, write_file -from tests.unittests.helpers import CiTestCase, mock, skipUnlessJinja - +from tests.unittests.helpers import mock, skipUnlessJinja -class TestRender(CiTestCase): +M_PATH = "cloudinit.cmd.devel.render." - with_logs = True - args = namedtuple("renderargs", "user_data instance_data debug") +class TestRender: - def setUp(self): - super(TestRender, self).setUp() - self.tmp = self.tmp_dir() + Args = namedtuple("Args", "user_data instance_data debug") - def test_handle_args_error_on_missing_user_data(self): + def test_handle_args_error_on_missing_user_data(self, caplog, tmpdir): """When user_data file path does not exist, log an error.""" - absent_file = self.tmp_path("user-data", dir=self.tmp) - instance_data = self.tmp_path("instance-data", dir=self.tmp) + absent_file = tmpdir.join("user-data") + instance_data = tmpdir.join("instance-data") write_file(instance_data, "{}") - args = self.args( + args = self.Args( user_data=absent_file, instance_data=instance_data, debug=False ) with mock.patch("sys.stderr", new_callable=StringIO): - self.assertEqual(1, render.handle_args("anyname", args)) - self.assertIn( - "Missing user-data file: %s" % absent_file, self.logs.getvalue() - ) + assert render.handle_args("anyname", args) == 1 + assert "Missing user-data file: %s" % absent_file in caplog.text - def test_handle_args_error_on_missing_instance_data(self): + def test_handle_args_error_on_missing_instance_data(self, caplog, tmpdir): """When instance_data file path does not exist, log an error.""" - user_data = self.tmp_path("user-data", dir=self.tmp) - absent_file = self.tmp_path("instance-data", dir=self.tmp) - args = self.args( + user_data = tmpdir.join("user-data") + absent_file = tmpdir.join("instance-data") + args = self.Args( user_data=user_data, instance_data=absent_file, debug=False ) with mock.patch("sys.stderr", new_callable=StringIO): - self.assertEqual(1, render.handle_args("anyname", args)) - self.assertIn( - "Missing instance-data.json file: %s" % absent_file, - self.logs.getvalue(), + assert render.handle_args("anyname", args) == 1 + assert ( + "Missing instance-data.json file: %s" % absent_file in caplog.text ) - def test_handle_args_defaults_instance_data(self): + @mock.patch(M_PATH + "read_cfg_paths") + def test_handle_args_defaults_instance_data(self, m_paths, caplog, tmpdir): """When no instance_data argument, default to configured run_dir.""" - user_data = self.tmp_path("user-data", dir=self.tmp) - run_dir = self.tmp_path("run_dir", dir=self.tmp) + user_data = tmpdir.join("user-data") + run_dir = tmpdir.join("run_dir") ensure_dir(run_dir) - paths = Paths({"run_dir": run_dir}) - self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths") - self.m_paths.return_value = paths - args = self.args(user_data=user_data, instance_data=None, debug=False) + m_paths.return_value = Paths({"run_dir": run_dir}) + args = self.Args(user_data=user_data, instance_data=None, debug=False) with mock.patch("sys.stderr", new_callable=StringIO): - self.assertEqual(1, render.handle_args("anyname", args)) - json_file = os.path.join(run_dir, INSTANCE_JSON_FILE) - self.assertIn( - "Missing instance-data.json file: %s" % json_file, - self.logs.getvalue(), - ) - - def test_handle_args_root_fallback_from_sensitive_instance_data(self): + assert render.handle_args("anyname", args) == 1 + json_file = run_dir.join(INSTANCE_JSON_FILE) + msg = "Missing instance-data.json file: %s" % json_file + assert msg in caplog.text + + @mock.patch(M_PATH + "read_cfg_paths") + def test_handle_args_root_fallback_from_sensitive_instance_data( + self, m_paths, caplog, tmpdir + ): """When root user defaults to sensitive.json.""" - user_data = self.tmp_path("user-data", dir=self.tmp) - run_dir = self.tmp_path("run_dir", dir=self.tmp) + user_data = tmpdir.join("user-data") + run_dir = tmpdir.join("run_dir") ensure_dir(run_dir) - paths = Paths({"run_dir": run_dir}) - self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths") - self.m_paths.return_value = paths - args = self.args(user_data=user_data, instance_data=None, debug=False) + m_paths.return_value = Paths({"run_dir": run_dir}) + args = self.Args(user_data=user_data, instance_data=None, debug=False) with mock.patch("sys.stderr", new_callable=StringIO): with mock.patch("os.getuid") as m_getuid: m_getuid.return_value = 0 - self.assertEqual(1, render.handle_args("anyname", args)) - json_file = os.path.join(run_dir, INSTANCE_JSON_FILE) - json_sensitive = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE) - self.assertIn( - "WARNING: Missing root-readable %s. Using redacted %s" - % (json_sensitive, json_file), - self.logs.getvalue(), - ) - self.assertIn( - "ERROR: Missing instance-data.json file: %s" % json_file, - self.logs.getvalue(), + assert render.handle_args("anyname", args) == 1 + json_file = run_dir.join(INSTANCE_JSON_FILE) + json_sensitive = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) + assert ( + "Missing root-readable %s. Using redacted %s" + % (json_sensitive, json_file) + in caplog.text ) + assert "Missing instance-data.json file: %s" % json_file in caplog.text - def test_handle_args_root_uses_sensitive_instance_data(self): + @mock.patch(M_PATH + "read_cfg_paths") + def test_handle_args_root_uses_sensitive_instance_data( + self, m_paths, tmpdir + ): """When root user, and no instance-data arg, use sensitive.json.""" - user_data = self.tmp_path("user-data", dir=self.tmp) + user_data = tmpdir.join("user-data") write_file(user_data, "##template: jinja\nrendering: {{ my_var }}") - run_dir = self.tmp_path("run_dir", dir=self.tmp) + run_dir = tmpdir.join("run_dir") ensure_dir(run_dir) - json_sensitive = os.path.join(run_dir, INSTANCE_JSON_SENSITIVE_FILE) + json_sensitive = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) write_file(json_sensitive, '{"my-var": "jinja worked"}') - paths = Paths({"run_dir": run_dir}) - self.add_patch("cloudinit.cmd.devel.render.read_cfg_paths", "m_paths") - self.m_paths.return_value = paths - args = self.args(user_data=user_data, instance_data=None, debug=False) - with mock.patch("sys.stderr", new_callable=StringIO): - with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: - with mock.patch("os.getuid") as m_getuid: - m_getuid.return_value = 0 - self.assertEqual(0, render.handle_args("anyname", args)) - self.assertIn("rendering: jinja worked", m_stdout.getvalue()) + m_paths.return_value = Paths({"run_dir": run_dir}) + args = self.Args(user_data=user_data, instance_data=None, debug=False) + with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: + with mock.patch("os.getuid") as m_getuid: + m_getuid.return_value = 0 + assert render.handle_args("anyname", args) == 0 + assert "rendering: jinja worked" in m_stdout.getvalue() @skipUnlessJinja() - def test_handle_args_renders_instance_data_vars_in_template(self): + def test_handle_args_renders_instance_data_vars_in_template( + self, caplog, tmpdir + ): """If user_data file is a jinja template render instance-data vars.""" - user_data = self.tmp_path("user-data", dir=self.tmp) + user_data = tmpdir.join("user-data") write_file(user_data, "##template: jinja\nrendering: {{ my_var }}") - instance_data = self.tmp_path("instance-data", dir=self.tmp) + instance_data = tmpdir.join("instance-data") write_file(instance_data, '{"my-var": "jinja worked"}') - args = self.args( + args = self.Args( user_data=user_data, instance_data=instance_data, debug=True ) with mock.patch("sys.stderr", new_callable=StringIO) as m_console_err: with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: - self.assertEqual(0, render.handle_args("anyname", args)) - self.assertIn( - "DEBUG: Converted jinja variables\n{", self.logs.getvalue() - ) - self.assertIn( - "DEBUG: Converted jinja variables\n{", m_console_err.getvalue() - ) - self.assertEqual("rendering: jinja worked", m_stdout.getvalue()) + assert render.handle_args("anyname", args) == 0 + assert "Converted jinja variables\n{" in caplog.text + assert "Converted jinja variables\n{" in m_console_err.getvalue() + assert "rendering: jinja worked" == m_stdout.getvalue() @skipUnlessJinja() - def test_handle_args_warns_and_gives_up_on_invalid_jinja_operation(self): + def test_handle_args_warns_and_gives_up_on_invalid_jinja_operation( + self, caplog, tmpdir + ): """If user_data file has invalid jinja operations log warnings.""" - user_data = self.tmp_path("user-data", dir=self.tmp) + user_data = tmpdir.join("user-data") write_file(user_data, "##template: jinja\nrendering: {{ my-var }}") - instance_data = self.tmp_path("instance-data", dir=self.tmp) + instance_data = tmpdir.join("instance-data") write_file(instance_data, '{"my-var": "jinja worked"}') - args = self.args( + args = self.Args( user_data=user_data, instance_data=instance_data, debug=True ) with mock.patch("sys.stderr", new_callable=StringIO): - self.assertEqual(1, render.handle_args("anyname", args)) - self.assertIn( - "WARNING: Ignoring jinja template for %s: Undefined jinja" + assert render.handle_args("anyname", args) == 1 + assert ( + "Ignoring jinja template for %s: Undefined jinja" ' variable: "my-var". Jinja tried subtraction. Perhaps you meant' - ' "my_var"?' % user_data, - self.logs.getvalue(), + ' "my_var"?' % user_data + ) in caplog.text + + @mock.patch( + "cloudinit.cmd.devel.Init.read_cfg", + side_effect=OSError(errno.EACCES, "Not allowed"), + ) + def test_handle_args_error_when_no_read_permission_instance_data2( + self, + m_read_cfg, + ): + """When instance_data file is unreadable, log an error.""" + args = self.Args(user_data=None, instance_data=None, debug=False) + with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr: + with pytest.raises(SystemExit) as exc_info: + render.handle_args("anyname", args) + assert exc_info.value.code == 1 + expected_error = ( + "Error:\nFailed reading config file(s) due to permission error:\n" + "[Errno 13] Not allowed\n" ) + assert m_stderr.getvalue() == expected_error + assert m_read_cfg.call_count == 1 # vi: ts=4 expandtab diff --git a/tests/unittests/cmd/test_query.py b/tests/unittests/cmd/test_query.py index 03a73bb5259..825dff18b83 100644 --- a/tests/unittests/cmd/test_query.py +++ b/tests/unittests/cmd/test_query.py @@ -5,7 +5,7 @@ import json import os from collections import namedtuple -from io import BytesIO +from io import BytesIO, StringIO from textwrap import dedent import pytest @@ -20,6 +20,8 @@ from cloudinit.util import b64e, write_file from tests.unittests.helpers import mock +M_PATH = "cloudinit.cmd.query." + def _gzip_data(data): with BytesIO() as iobuf: @@ -28,11 +30,11 @@ def _gzip_data(data): return iobuf.getvalue() -@mock.patch("cloudinit.cmd.query.addLogHandlerCLI", lambda *args: "") +@mock.patch(M_PATH + "addLogHandlerCLI", lambda *args: "") class TestQuery: - args = namedtuple( - "queryargs", + Args = namedtuple( + "Args", "debug dump_all format instance_data list_keys user_data vendor_data" " varname", ) @@ -70,7 +72,7 @@ def _setup_paths(self, tmpdir, ud_val=None, vd_val=None): def test_handle_args_error_on_missing_param(self, caplog, capsys): """Error when missing required parameters and print usage.""" - args = self.args( + args = self.Args( debug=False, dump_all=False, format=None, @@ -81,7 +83,7 @@ def test_handle_args_error_on_missing_param(self, caplog, capsys): varname=None, ) with mock.patch( - "cloudinit.cmd.query.addLogHandlerCLI", return_value="" + M_PATH + "addLogHandlerCLI", return_value="" ) as m_cli_log: assert 1 == query.handle_args("anyname", args) expected_error = ( @@ -114,7 +116,7 @@ def test_handle_args_error_on_invalid_vaname_paths( """Error when varname is not a valid instance-data variable path.""" instance_data = tmpdir.join("instance-data") instance_data.write(inst_data) - args = self.args( + args = self.Args( debug=False, dump_all=False, format=None, @@ -125,12 +127,10 @@ def test_handle_args_error_on_invalid_vaname_paths( varname=varname, ) paths, _, _, _ = self._setup_paths(tmpdir) - with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + with mock.patch(M_PATH + "read_cfg_paths") as m_paths: m_paths.return_value = paths - with mock.patch( - "cloudinit.cmd.query.addLogHandlerCLI", return_value="" - ): - with mock.patch("cloudinit.cmd.query.load_userdata") as m_lud: + with mock.patch(M_PATH + "addLogHandlerCLI", return_value=""): + with mock.patch(M_PATH + "load_userdata") as m_lud: m_lud.return_value = "ud" assert 1 == query.handle_args("anyname", args) assert expected_error in caplog.text @@ -138,7 +138,7 @@ def test_handle_args_error_on_invalid_vaname_paths( def test_handle_args_error_on_missing_instance_data(self, caplog, tmpdir): """When instance_data file path does not exist, log an error.""" absent_fn = tmpdir.join("absent") - args = self.args( + args = self.Args( debug=False, dump_all=True, format=None, @@ -159,7 +159,7 @@ def test_handle_args_error_when_no_read_permission_instance_data( """When instance_data file is unreadable, log an error.""" noread_fn = tmpdir.join("unreadable") noread_fn.write("thou shall not pass") - args = self.args( + args = self.Args( debug=False, dump_all=True, format=None, @@ -169,15 +169,45 @@ def test_handle_args_error_when_no_read_permission_instance_data( vendor_data="vd", varname=None, ) - with mock.patch("cloudinit.cmd.query.util.load_file") as m_load: + with mock.patch(M_PATH + "util.load_file") as m_load: m_load.side_effect = OSError(errno.EACCES, "Not allowed") assert 1 == query.handle_args("anyname", args) msg = "No read permission on '%s'. Try sudo" % noread_fn assert msg in caplog.text + @mock.patch( + "cloudinit.cmd.devel.Init.read_cfg", + side_effect=OSError(errno.EACCES, "Not allowed"), + ) + def test_handle_args_error_when_no_read_permission_init_cfg( + self, + m_read_cfg, + ): + """When init cfg file is unreadable, log an error.""" + args = self.Args( + debug=False, + dump_all=True, + format=None, + instance_data=None, + list_keys=False, + user_data=None, + vendor_data=None, + varname=None, + ) + with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr: + with pytest.raises(SystemExit) as exc_info: + query.handle_args("anyname", args) + assert exc_info.value.code == 1 + expected_error = ( + "Error:\nFailed reading config file(s) due to permission error:\n" + "[Errno 13] Not allowed\n" + ) + assert m_stderr.getvalue() == expected_error + assert m_read_cfg.call_count == 1 + def test_handle_args_defaults_instance_data(self, caplog, tmpdir): """When no instance_data argument, default to configured run_dir.""" - args = self.args( + args = self.Args( debug=False, dump_all=True, format=None, @@ -188,7 +218,7 @@ def test_handle_args_defaults_instance_data(self, caplog, tmpdir): varname=None, ) paths, run_dir, _, _ = self._setup_paths(tmpdir) - with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + with mock.patch(M_PATH + "read_cfg_paths") as m_paths: m_paths.return_value = paths assert 1 == query.handle_args("anyname", args) json_file = run_dir.join(INSTANCE_JSON_FILE) @@ -197,7 +227,7 @@ def test_handle_args_defaults_instance_data(self, caplog, tmpdir): def test_handle_args_root_fallsback_to_instance_data(self, caplog, tmpdir): """When no instance_data argument, root falls back to redacted json.""" - args = self.args( + args = self.Args( debug=False, dump_all=True, format=None, @@ -208,7 +238,7 @@ def test_handle_args_root_fallsback_to_instance_data(self, caplog, tmpdir): varname=None, ) paths, run_dir, _, _ = self._setup_paths(tmpdir) - with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + with mock.patch(M_PATH + "read_cfg_paths") as m_paths: m_paths.return_value = paths with mock.patch("os.getuid") as m_getuid: m_getuid.return_value = 0 @@ -239,7 +269,7 @@ def test_handle_args_root_processes_user_data( ) sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) sensitive_file.write('{"my-var": "it worked"}') - args = self.args( + args = self.Args( debug=False, dump_all=True, format=None, @@ -249,7 +279,7 @@ def test_handle_args_root_processes_user_data( vendor_data=vendor_data.strpath, varname=None, ) - with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + with mock.patch(M_PATH + "read_cfg_paths") as m_paths: m_paths.return_value = paths with mock.patch("os.getuid") as m_getuid: m_getuid.return_value = 0 @@ -277,7 +307,7 @@ def test_handle_args_user_vendor_data_defaults_to_instance_link( vd_path = os.path.join(paths.instance_link, "vendor-data.txt") write_file(vd_path, "instance_link_vd") - args = self.args( + args = self.Args( debug=False, dump_all=True, format=None, @@ -287,7 +317,7 @@ def test_handle_args_user_vendor_data_defaults_to_instance_link( vendor_data=None, varname=None, ) - with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + with mock.patch(M_PATH + "read_cfg_paths") as m_paths: m_paths.return_value = paths with mock.patch("os.getuid", return_value=0): assert 0 == query.handle_args("anyname", args) @@ -308,7 +338,7 @@ def test_handle_args_root_uses_instance_sensitive_data( ) sensitive_file = run_dir.join(INSTANCE_JSON_SENSITIVE_FILE) sensitive_file.write('{"my-var": "it worked"}') - args = self.args( + args = self.Args( debug=False, dump_all=True, format=None, @@ -318,7 +348,7 @@ def test_handle_args_root_uses_instance_sensitive_data( vendor_data=vendor_data.strpath, varname=None, ) - with mock.patch("cloudinit.cmd.query.read_cfg_paths") as m_paths: + with mock.patch(M_PATH + "read_cfg_paths") as m_paths: m_paths.return_value = paths with mock.patch("os.getuid") as m_getuid: m_getuid.return_value = 0 @@ -334,7 +364,7 @@ def test_handle_args_dumps_all_instance_data(self, capsys, tmpdir): """When --all is specified query will dump all instance data vars.""" instance_data = tmpdir.join("instance-data") instance_data.write('{"my-var": "it worked"}') - args = self.args( + args = self.Args( debug=False, dump_all=True, format=None, @@ -359,7 +389,7 @@ def test_handle_args_returns_top_level_varname(self, capsys, tmpdir): """When the argument varname is passed, report its value.""" instance_data = tmpdir.join("instance-data") instance_data.write('{"my-var": "it worked"}') - args = self.args( + args = self.Args( debug=False, dump_all=True, format=None, @@ -398,7 +428,7 @@ def test_handle_args_returns_nested_varname( """If user_data file is a jinja template render instance-data vars.""" instance_data = tmpdir.join("instance-data") instance_data.write(inst_data) - args = self.args( + args = self.Args( debug=False, dump_all=False, format=None, @@ -440,7 +470,7 @@ def test_handle_args_returns_standardized_vars_to_top_level_aliases( } """ ) - args = self.args( + args = self.Args( debug=False, dump_all=True, format=None, @@ -466,7 +496,7 @@ def test_handle_args_list_keys_sorts_top_level_keys_when_no_varname( ' "top": "gun"}' ) expected = "top\nuserdata\nv1\nv1_1\nv2\nv2_2\nvendordata\n" - args = self.args( + args = self.Args( debug=False, dump_all=False, format=None, @@ -492,7 +522,7 @@ def test_handle_args_list_keys_sorts_nested_keys_when_varname( + ' {"v2_2": "val2.2"}, "top": "gun"}' ) expected = "v1_1\nv1_2\n" - args = self.args( + args = self.Args( debug=False, dump_all=False, format=None, @@ -518,7 +548,7 @@ def test_handle_args_list_keys_errors_when_varname_is_not_a_dict( + '{"v2_2": "val2.2"}, "top": "gun"}' ) expected_error = "--list-keys provided but 'top' is not a dict" - args = self.args( + args = self.Args( debug=False, dump_all=False, format=None, diff --git a/tests/unittests/cmd/test_status.py b/tests/unittests/cmd/test_status.py index d3d2db55628..79f47992909 100644 --- a/tests/unittests/cmd/test_status.py +++ b/tests/unittests/cmd/test_status.py @@ -1,5 +1,6 @@ # This file is part of cloud-init. See LICENSE file for license information. +import errno import os from collections import namedtuple from io import StringIO @@ -14,6 +15,9 @@ from cloudinit.util import ensure_file from tests.unittests.helpers import wrap_and_call +M_NAME = "cloudinit.cmd.status" +M_PATH = f"{M_NAME}." + MyPaths = namedtuple("MyPaths", "run_dir") MyArgs = namedtuple("MyArgs", "long wait") Config = namedtuple( @@ -32,20 +36,6 @@ def config(tmpdir): ) -@pytest.fixture(scope="function") -def init_class(config): - class FakeInit(object): - paths = config.paths - - def __init__(self, ds_deps): - pass - - def read_cfg(self): - pass - - return FakeInit - - class TestStatus: @pytest.mark.parametrize( [ @@ -132,7 +122,7 @@ def test__is_cloudinit_disabled( if ensured_file is not None: ensure_file(ensured_file(config)) (is_disabled, reason) = wrap_and_call( - "cloudinit.cmd.status", + M_NAME, { "uses_systemd": uses_systemd, "get_cmdline": get_cmdline, @@ -147,19 +137,18 @@ def test__is_cloudinit_disabled( else: assert reason == expected_reason(config) - def test_status_returns_not_run(self, config: Config, init_class): + @mock.patch(M_PATH + "read_cfg_paths") + def test_status_returns_not_run(self, m_read_cfg_paths, config: Config): """When status.json does not exist yet, return 'not run'.""" + m_read_cfg_paths.return_value = config.paths assert not os.path.exists( config.status_file ), "Unexpected status.json found" cmdargs = MyArgs(long=False, wait=False) with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - "cloudinit.cmd.status", - { - "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": init_class}, - }, + M_NAME, + {"_is_cloudinit_disabled": (False, "")}, status.handle_status_args, "ignored", cmdargs, @@ -167,11 +156,12 @@ def test_status_returns_not_run(self, config: Config, init_class): assert retcode == 0 assert m_stdout.getvalue() == "status: not run\n" + @mock.patch(M_PATH + "read_cfg_paths") def test_status_returns_disabled_long_on_presence_of_disable_file( - self, config: Config, init_class + self, m_read_cfg_paths, config: Config ): """When cloudinit is disabled, return disabled reason.""" - + m_read_cfg_paths.return_value = config.paths checked_files = [] def fakeexists(filepath): @@ -182,14 +172,13 @@ def fakeexists(filepath): cmdargs = MyArgs(long=True, wait=False) with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - "cloudinit.cmd.status", + M_NAME, { "os.path.exists": {"side_effect": fakeexists}, "_is_cloudinit_disabled": ( True, "disabled for some reason", ), - "Init": {"side_effect": init_class}, }, status.handle_status_args, "ignored", @@ -377,8 +366,10 @@ def fakeexists(filepath): ), ], ) + @mock.patch(M_PATH + "read_cfg_paths") def test_status_output( self, + m_read_cfg_paths, ensured_file: Optional[Callable], status_content: Dict, assert_file, @@ -386,9 +377,8 @@ def test_status_output( expected_retcode: int, expected_status: str, config: Config, - init_class, ): - """""" + m_read_cfg_paths.return_value = config.paths if ensured_file: ensure_file(ensured_file(config)) write_json( @@ -401,11 +391,8 @@ def test_status_output( ), f"Unexpected {config.result_file} found" with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - "cloudinit.cmd.status", - { - "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": init_class}, - }, + M_NAME, + {"_is_cloudinit_disabled": (False, "")}, status.handle_status_args, "ignored", cmdargs, @@ -413,8 +400,12 @@ def test_status_output( assert retcode == expected_retcode assert m_stdout.getvalue() == expected_status - def test_status_wait_blocks_until_done(self, config: Config, init_class): + @mock.patch(M_PATH + "read_cfg_paths") + def test_status_wait_blocks_until_done( + self, m_read_cfg_paths, config: Config + ): """Specifying wait will poll every 1/4 second until done state.""" + m_read_cfg_paths.return_value = config.paths running_json = { "v1": { "stage": "init", @@ -446,11 +437,10 @@ def fake_sleep(interval): cmdargs = MyArgs(long=False, wait=True) with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - "cloudinit.cmd.status", + M_NAME, { "sleep": {"side_effect": fake_sleep}, "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": init_class}, }, status.handle_status_args, "ignored", @@ -460,8 +450,12 @@ def fake_sleep(interval): assert sleep_calls == 4 assert m_stdout.getvalue() == "....\nstatus: done\n" - def test_status_wait_blocks_until_error(self, config: Config, init_class): + @mock.patch(M_PATH + "read_cfg_paths") + def test_status_wait_blocks_until_error( + self, m_read_cfg_paths, config: Config + ): """Specifying wait will poll every 1/4 second until error state.""" + m_read_cfg_paths.return_value = config.paths running_json = { "v1": { "stage": "init", @@ -495,11 +489,10 @@ def fake_sleep(interval): cmdargs = MyArgs(long=False, wait=True) with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: retcode = wrap_and_call( - "cloudinit.cmd.status", + M_NAME, { "sleep": {"side_effect": fake_sleep}, "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": init_class}, }, status.handle_status_args, "ignored", @@ -509,8 +502,10 @@ def fake_sleep(interval): assert sleep_calls == 4 assert m_stdout.getvalue() == "....\nstatus: error\n" - def test_status_main(self, config: Config, init_class, capsys): + @mock.patch(M_PATH + "read_cfg_paths") + def test_status_main(self, m_read_cfg_paths, config: Config): """status.main can be run as a standalone script.""" + m_read_cfg_paths.return_value = config.paths write_json( config.status_file, {"v1": {"init": {"start": 1, "finished": None}}}, @@ -518,16 +513,66 @@ def test_status_main(self, config: Config, init_class, capsys): with pytest.raises(SystemExit) as e: with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: wrap_and_call( - "cloudinit.cmd.status", + M_NAME, { "sys.argv": {"new": ["status"]}, "_is_cloudinit_disabled": (False, ""), - "Init": {"side_effect": init_class}, }, status.main, ) assert e.value.code == 0 assert m_stdout.getvalue() == "status: running\n" + @mock.patch( + "cloudinit.cmd.devel.Init.read_cfg", + side_effect=OSError(errno.EACCES, "Not allowed"), + ) + def test_status_no_read_permission_init_config(self, m_read_cfg): + """status.handle_status_args outputs to stderr and exists with 1 if + some init cfg file has no user permissions. + """ + + cmdargs = MyArgs(long=False, wait=True) + with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr: + with pytest.raises(SystemExit) as exc_info: + wrap_and_call( + M_NAME, + { + "sleep": {"side_effect": lambda *_: None}, + "_is_cloudinit_disabled": (False, ""), + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + assert exc_info.value.code == 1 + expected_error = ( + "Error:\nFailed reading config file(s) due to permission error:\n" + "[Errno 13] Not allowed\n" + ) + assert m_stderr.getvalue() == expected_error + assert m_read_cfg.call_count == 1 + + @mock.patch( + "cloudinit.cmd.devel.Init.read_cfg", + side_effect=OSError(errno.EACCES, "Not allowed"), + ) + def test_get_status_details_no_read_permission_init_config( + self, m_read_cfg + ): + """status.get_status_details outputs to stderr and exists with 1 if + some init cfg file has no user permissions. + """ + with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr: + with pytest.raises(SystemExit) as exc_info: + status.get_status_details() + assert exc_info.value.code == 1 + expected_error = ( + "Error:\nFailed reading config file(s) due to permission error:\n" + "[Errno 13] Not allowed\n" + ) + assert m_stderr.getvalue() == expected_error + assert m_read_cfg.call_count == 1 + # vi: ts=4 expandtab syntax=python From 1700565324282e1d1c0a1498397e502b6b6feaed Mon Sep 17 00:00:00 2001 From: Alberto Contreras Date: Mon, 9 May 2022 11:15:19 +0200 Subject: [PATCH 3/8] Fix test. --- tests/unittests/cmd/devel/test_render.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/unittests/cmd/devel/test_render.py b/tests/unittests/cmd/devel/test_render.py index 8eb2e33b221..d8a96cf2dcb 100644 --- a/tests/unittests/cmd/devel/test_render.py +++ b/tests/unittests/cmd/devel/test_render.py @@ -112,11 +112,13 @@ def test_handle_args_renders_instance_data_vars_in_template( args = self.Args( user_data=user_data, instance_data=instance_data, debug=True ) - with mock.patch("sys.stderr", new_callable=StringIO) as m_console_err: + with mock.patch("sys.stderr", new_callable=StringIO): with mock.patch("sys.stdout", new_callable=StringIO) as m_stdout: assert render.handle_args("anyname", args) == 0 assert "Converted jinja variables\n{" in caplog.text - assert "Converted jinja variables\n{" in m_console_err.getvalue() + # TODO enable after pytest>=3.4 + # more info: https://docs.pytest.org/en/stable/how-to/logging.html + # assert "Converted jinja variables\n{" in m_stderr.getvalue() assert "rendering: jinja worked" == m_stdout.getvalue() @skipUnlessJinja() From c647e96e2b89ef9038f132551eb259594e35e822 Mon Sep 17 00:00:00 2001 From: Alberto Contreras Date: Wed, 11 May 2022 14:27:10 +0200 Subject: [PATCH 4/8] Redact confs coming from permission errors. --- cloudinit/cmd/devel/__init__.py | 13 +- cloudinit/cmd/devel/render.py | 5 +- cloudinit/cmd/status.py | 5 +- cloudinit/stages.py | 58 ++++--- cloudinit/util.py | 18 +- pyproject.toml | 2 - tests/unittests/cmd/devel/test_render.py | 35 ++-- tests/unittests/cmd/test_query.py | 38 +++-- tests/unittests/cmd/test_status.py | 77 ++++----- tests/unittests/test_stages.py | 86 +++++++--- tests/unittests/test_util.py | 201 +++++++++++++++-------- 11 files changed, 323 insertions(+), 215 deletions(-) diff --git a/cloudinit/cmd/devel/__init__.py b/cloudinit/cmd/devel/__init__.py index a78848ef3a0..55eda957be8 100755 --- a/cloudinit/cmd/devel/__init__.py +++ b/cloudinit/cmd/devel/__init__.py @@ -3,13 +3,11 @@ """Common cloud-init devel commandline utility functions.""" -import errno import logging from cloudinit import log from cloudinit.helpers import Paths from cloudinit.stages import Init -from cloudinit.util import error def addLogHandlerCLI(logger, log_level): @@ -25,16 +23,7 @@ def read_cfg_paths() -> Paths: It handles file permission errors. """ init = Init(ds_deps=[]) - try: - init.read_cfg() - except OSError as e: - if e.errno == errno.EACCES: - error( - f"Failed reading config file(s) due to permission error:\n{e}", - rc=1, - sys_exit=True, - ) - raise + init.read_cfg() return init.paths diff --git a/cloudinit/cmd/devel/render.py b/cloudinit/cmd/devel/render.py index 62b432d20fa..cfbcce7167e 100755 --- a/cloudinit/cmd/devel/render.py +++ b/cloudinit/cmd/devel/render.py @@ -63,7 +63,10 @@ def handle_args(name, args): if args.instance_data: instance_data_fn = args.instance_data else: - paths = read_cfg_paths() + try: + paths = read_cfg_paths() + except (IOError, OSError): + return 1 uid = os.getuid() redacted_data_fn = os.path.join(paths.run_dir, INSTANCE_JSON_FILE) if uid == 0: diff --git a/cloudinit/cmd/status.py b/cloudinit/cmd/status.py index 1c7c209be5b..1b32d473678 100755 --- a/cloudinit/cmd/status.py +++ b/cloudinit/cmd/status.py @@ -68,7 +68,10 @@ def get_parser(parser=None): def handle_status_args(name, args) -> int: """Handle calls to 'cloud-init status' as a subcommand.""" # Read configured paths - paths = read_cfg_paths() + try: + paths = read_cfg_paths() + except (IOError, OSError): + return 1 status, status_detail, time = get_status_details(paths) if args.wait: while status in (UXAppStatus.NOT_RUN, UXAppStatus.RUNNING): diff --git a/cloudinit/stages.py b/cloudinit/stages.py index 4ebe413bc25..02811befe2a 100644 --- a/cloudinit/stages.py +++ b/cloudinit/stages.py @@ -5,11 +5,13 @@ # This file is part of cloud-init. See LICENSE file for license information. import copy +import errno import os import pickle import sys from collections import namedtuple -from typing import Dict, List, Optional, Set +from functools import partial +from typing import Dict, Iterable, List, Optional, Set from cloudinit import cloud, distros, handlers, helpers, importer from cloudinit import log as logging @@ -58,12 +60,12 @@ def update_event_enabled( case, we only have the data source's `default_update_events`, so an event that should be enabled in userdata may be denied. """ - default_events = ( - datasource.default_update_events - ) # type: Dict[EventScope, Set[EventType]] - user_events = userdata_to_events( + default_events: Dict[ + EventScope, Set[EventType] + ] = datasource.default_update_events + user_events: Dict[EventScope, Set[EventType]] = userdata_to_events( cfg.get("updates", {}) - ) # type: Dict[EventScope, Set[EventType]] + ) # A value in the first will override a value in the second allowed = util.mergemanydict( [ @@ -73,6 +75,7 @@ def update_event_enabled( ) LOG.debug("Allowed events: %s", allowed) + scopes: Iterable[EventScope] if not scope: scopes = allowed.keys() else: @@ -953,23 +956,38 @@ def should_run_on_boot_event(): def read_runtime_config(): - return util.read_conf(RUN_CLOUD_CONFIG) + try: + return util.read_conf(RUN_CLOUD_CONFIG) + except OSError as e: + if e.errno == errno.EACCES: + LOG.warning( + "REDACTED config part %s for non-root user", + RUN_CLOUD_CONFIG, + ) + raise def fetch_base_config(): - return util.mergemanydict( - [ - # builtin config, hardcoded in settings.py. - util.get_builtin_cfg(), - # Anything in your conf.d or 'default' cloud.cfg location. - util.read_conf_with_confd(CLOUD_CONFIG), - # runtime config. I.e., /run/cloud-init/cloud.cfg - read_runtime_config(), - # Kernel/cmdline parameters override system config - util.read_conf_from_cmdline(), - ], - reverse=True, - ) + config_loaders = [ + # builtin config, hardcoded in settings.py. + util.get_builtin_cfg, + # Anything in your conf.d or 'default' cloud.cfg location. + partial(util.read_conf_with_confd, CLOUD_CONFIG), + # runtime config. I.e., /run/cloud-init/cloud.cfg + read_runtime_config, + # Kernel/cmdline parameters override system config + util.read_conf_from_cmdline, + ] + configs = [] + for config_loader in config_loaders: + try: + configs.append(config_loader()) + except OSError as e: + if e.errno == errno.EACCES: + pass # Already logged in `config_loaders`. + else: + raise + return util.mergemanydict(configs, reverse=True) def _pkl_store(obj, fname): diff --git a/cloudinit/util.py b/cloudinit/util.py index da2f63dac1e..a8ce3768029 100644 --- a/cloudinit/util.py +++ b/cloudinit/util.py @@ -32,7 +32,7 @@ import sys import time from base64 import b64decode, b64encode -from errno import ENOENT +from errno import EACCES, ENOENT from functools import lru_cache from typing import List from urllib import parse @@ -998,6 +998,7 @@ def read_seeded(base="", ext="", timeout=5, retries=10, file_retries=0): def read_conf_d(confd): + """Read configuration directory.""" # Get reverse sorted list (later trumps newer) confs = sorted(os.listdir(confd), reverse=True) @@ -1010,13 +1011,24 @@ def read_conf_d(confd): # Load them all so that they can be merged cfgs = [] for fn in confs: - cfgs.append(read_conf(os.path.join(confd, fn))) + try: + cfgs.append(read_conf(os.path.join(confd, fn))) + except OSError as e: + if e.errno == EACCES: + LOG.warning( + "REDACTED config part %s/%s for non-root user", confd, fn + ) return mergemanydict(cfgs) def read_conf_with_confd(cfgfile): - cfg = read_conf(cfgfile) + try: + cfg = read_conf(cfgfile) + except OSError as e: + if e.errno == EACCES: + LOG.warning("REDACTED config part %s for non-root user", cfgfile) + raise confd = False if "conf_d" in cfg: diff --git a/pyproject.toml b/pyproject.toml index 26dfd58ff0f..1aac03a8559 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,6 @@ exclude=[ '^cloudinit/sources/DataSourceVMware\.py$', '^cloudinit/sources/__init__\.py$', '^cloudinit/sources/helpers/vmware/imc/config_file\.py$', - '^cloudinit/stages\.py$', '^cloudinit/templater\.py$', '^cloudinit/url_helper\.py$', '^conftest\.py$', @@ -93,7 +92,6 @@ exclude=[ '^tests/unittests/test_subp\.py$', '^tests/unittests/test_templating\.py$', '^tests/unittests/test_url_helper\.py$', - '^tests/unittests/test_util\.py$', '^tools/mock-meta\.py$', ] diff --git a/tests/unittests/cmd/devel/test_render.py b/tests/unittests/cmd/devel/test_render.py index d8a96cf2dcb..4d8b79de79a 100644 --- a/tests/unittests/cmd/devel/test_render.py +++ b/tests/unittests/cmd/devel/test_render.py @@ -141,26 +141,27 @@ def test_handle_args_warns_and_gives_up_on_invalid_jinja_operation( ' "my_var"?' % user_data ) in caplog.text - @mock.patch( - "cloudinit.cmd.devel.Init.read_cfg", - side_effect=OSError(errno.EACCES, "Not allowed"), + @pytest.mark.parametrize( + "exception", + [ + (OSError(errno.EACCES, "Not allowed"),), + (OSError(errno.ENOENT, "Not allowed"),), + (IOError,), + ], ) - def test_handle_args_error_when_no_read_permission_instance_data2( - self, - m_read_cfg, + def test_handle_args_no_read_permission_init_config( + self, exception, capsys ): - """When instance_data file is unreadable, log an error.""" + """render.handle_args exists with 1 and no sys-output.""" args = self.Args(user_data=None, instance_data=None, debug=False) - with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr: - with pytest.raises(SystemExit) as exc_info: - render.handle_args("anyname", args) - assert exc_info.value.code == 1 - expected_error = ( - "Error:\nFailed reading config file(s) due to permission error:\n" - "[Errno 13] Not allowed\n" - ) - assert m_stderr.getvalue() == expected_error - assert m_read_cfg.call_count == 1 + with mock.patch( + M_PATH + "read_cfg_paths", side_effect=exception + ) as m_read_cfg_paths: + assert 1 == render.handle_args("anyname", args) + assert m_read_cfg_paths.call_count == 1 + out, err = capsys.readouterr() + assert not out + assert not err # vi: ts=4 expandtab diff --git a/tests/unittests/cmd/test_query.py b/tests/unittests/cmd/test_query.py index 825dff18b83..207078fa0b8 100644 --- a/tests/unittests/cmd/test_query.py +++ b/tests/unittests/cmd/test_query.py @@ -5,7 +5,7 @@ import json import os from collections import namedtuple -from io import BytesIO, StringIO +from io import BytesIO from textwrap import dedent import pytest @@ -110,7 +110,7 @@ def test_handle_args_error_on_missing_param(self, caplog, capsys): ), ), ) - def test_handle_args_error_on_invalid_vaname_paths( + def test_handle_args_error_on_invalid_varname_paths( self, inst_data, varname, expected_error, caplog, tmpdir ): """Error when varname is not a valid instance-data variable path.""" @@ -175,15 +175,18 @@ def test_handle_args_error_when_no_read_permission_instance_data( msg = "No read permission on '%s'. Try sudo" % noread_fn assert msg in caplog.text - @mock.patch( - "cloudinit.cmd.devel.Init.read_cfg", - side_effect=OSError(errno.EACCES, "Not allowed"), + @pytest.mark.parametrize( + "exception", + [ + (OSError(errno.EACCES, "Not allowed"),), + (OSError(errno.ENOENT, "Not allowed"),), + (IOError,), + ], ) def test_handle_args_error_when_no_read_permission_init_cfg( - self, - m_read_cfg, + self, exception, capsys ): - """When init cfg file is unreadable, log an error.""" + """query.handle_status_args exists with 1 and no sys-output.""" args = self.Args( debug=False, dump_all=True, @@ -194,16 +197,15 @@ def test_handle_args_error_when_no_read_permission_init_cfg( vendor_data=None, varname=None, ) - with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr: - with pytest.raises(SystemExit) as exc_info: - query.handle_args("anyname", args) - assert exc_info.value.code == 1 - expected_error = ( - "Error:\nFailed reading config file(s) due to permission error:\n" - "[Errno 13] Not allowed\n" - ) - assert m_stderr.getvalue() == expected_error - assert m_read_cfg.call_count == 1 + with mock.patch( + M_PATH + "read_cfg_paths", + side_effect=exception, + ) as m_read_cfg_paths: + query.handle_args("anyname", args) + assert m_read_cfg_paths.call_count == 1 + out, err = capsys.readouterr() + assert not out + assert not err def test_handle_args_defaults_instance_data(self, caplog, tmpdir): """When no instance_data argument, default to configured run_dir.""" diff --git a/tests/unittests/cmd/test_status.py b/tests/unittests/cmd/test_status.py index 79f47992909..2a613aee222 100644 --- a/tests/unittests/cmd/test_status.py +++ b/tests/unittests/cmd/test_status.py @@ -523,56 +523,37 @@ def test_status_main(self, m_read_cfg_paths, config: Config): assert e.value.code == 0 assert m_stdout.getvalue() == "status: running\n" - @mock.patch( - "cloudinit.cmd.devel.Init.read_cfg", - side_effect=OSError(errno.EACCES, "Not allowed"), - ) - def test_status_no_read_permission_init_config(self, m_read_cfg): - """status.handle_status_args outputs to stderr and exists with 1 if - some init cfg file has no user permissions. - """ - - cmdargs = MyArgs(long=False, wait=True) - with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr: - with pytest.raises(SystemExit) as exc_info: - wrap_and_call( - M_NAME, - { - "sleep": {"side_effect": lambda *_: None}, - "_is_cloudinit_disabled": (False, ""), - }, - status.handle_status_args, - "ignored", - cmdargs, - ) - assert exc_info.value.code == 1 - expected_error = ( - "Error:\nFailed reading config file(s) due to permission error:\n" - "[Errno 13] Not allowed\n" - ) - assert m_stderr.getvalue() == expected_error - assert m_read_cfg.call_count == 1 - - @mock.patch( - "cloudinit.cmd.devel.Init.read_cfg", - side_effect=OSError(errno.EACCES, "Not allowed"), + @pytest.mark.parametrize( + "exception", + [ + (OSError(errno.EACCES, "Not allowed"),), + (OSError(errno.ENOENT, "Not allowed"),), + (IOError,), + ], ) - def test_get_status_details_no_read_permission_init_config( - self, m_read_cfg + def test_handle_args_no_read_permission_init_config( + self, exception, capsys ): - """status.get_status_details outputs to stderr and exists with 1 if - some init cfg file has no user permissions. - """ - with mock.patch("sys.stderr", new_callable=StringIO) as m_stderr: - with pytest.raises(SystemExit) as exc_info: - status.get_status_details() - assert exc_info.value.code == 1 - expected_error = ( - "Error:\nFailed reading config file(s) due to permission error:\n" - "[Errno 13] Not allowed\n" - ) - assert m_stderr.getvalue() == expected_error - assert m_read_cfg.call_count == 1 + """status.handle_status_args exists with 1 and no sys-output.""" + cmdargs = MyArgs(long=False, wait=True) + with mock.patch( + M_PATH + "read_cfg_paths", + side_effect=exception, + ) as m_read_cfg_paths: + assert 1 == wrap_and_call( + M_NAME, + { + "sleep": {"side_effect": lambda *_: None}, + "_is_cloudinit_disabled": (False, ""), + }, + status.handle_status_args, + "ignored", + cmdargs, + ) + assert m_read_cfg_paths.call_count == 1 + out, err = capsys.readouterr() + assert not out + assert not err # vi: ts=4 expandtab syntax=python diff --git a/tests/unittests/test_stages.py b/tests/unittests/test_stages.py index fdf0e490550..cf85141a01d 100644 --- a/tests/unittests/test_stages.py +++ b/tests/unittests/test_stages.py @@ -1,6 +1,7 @@ # This file is part of cloud-init. See LICENSE file for license information. """Tests related to cloudinit.stages module.""" +import errno import os import stat @@ -13,6 +14,7 @@ from tests.unittests.helpers import mock TEST_INSTANCE_ID = "i-testing" +M_PATH = "cloudinit.stages." class FakeDataSource(sources.DataSource): @@ -58,8 +60,8 @@ def test_wb__find_networking_config_disabled(self): write_file(disable_file, "") assert (None, disable_file) == self.init._find_networking_config() - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "net_config", [ @@ -80,8 +82,8 @@ def test_wb__find_networking_config_disabled_by_kernel( assert caplog.records[0].levelname == "DEBUG" assert "network config disabled by cmdline" in caplog.text - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "net_config", [ @@ -102,8 +104,8 @@ def test_wb__find_networking_config_disabled_by_initrd( assert caplog.records[0].levelname == "DEBUG" assert "network config disabled by initramfs" in caplog.text - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "net_config", [ @@ -130,8 +132,8 @@ def test_wb__find_networking_config_disabled_by_datasrc( assert caplog.records[0].levelname == "DEBUG" assert "network config disabled by ds" in caplog.text - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "net_config", [ @@ -156,8 +158,8 @@ def test_wb__find_networking_config_disabled_by_sysconfig( assert caplog.records[0].levelname == "DEBUG" assert "network config disabled by system_cfg" in caplog.text - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "in_config,out_config", [ @@ -188,8 +190,8 @@ def test__find_networking_config_uses_datasrc_order( NetworkConfigSource.DS, ) == self.init._find_networking_config() - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "in_config,out_config", [ @@ -217,8 +219,8 @@ def test__find_networking_config_warns_if_datasrc_uses_invalid_src( in caplog.text ) - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "in_config,out_config", [ @@ -246,8 +248,8 @@ def test__find_networking_config_warns_if_datasrc_uses_unavailable_src( in caplog.text ) - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "in_config,out_config", [ @@ -273,8 +275,8 @@ def test_wb__find_networking_config_returns_kernel( NetworkConfigSource.CMD_LINE, ) == self.init._find_networking_config() - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "in_config,out_config", [ @@ -300,8 +302,8 @@ def test_wb__find_networking_config_returns_initramfs( NetworkConfigSource.INITRAMFS, ) == self.init._find_networking_config() - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "in_config,out_config", [ @@ -327,8 +329,8 @@ def test_wb__find_networking_config_returns_system_cfg( NetworkConfigSource.SYSTEM_CFG, ) == self.init._find_networking_config() - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") @pytest.mark.parametrize( "in_config,out_config", [ @@ -348,8 +350,8 @@ def test_wb__find_networking_config_returns_datasrc_cfg( NetworkConfigSource.DS, ) == self.init._find_networking_config() - @mock.patch("cloudinit.stages.cmdline.read_initramfs_config") - @mock.patch("cloudinit.stages.cmdline.read_kernel_cmdline_config") + @mock.patch(M_PATH + "cmdline.read_initramfs_config") + @mock.patch(M_PATH + "cmdline.read_kernel_cmdline_config") def test_wb__find_networking_config_returns_fallback( self, m_cmdline, m_initramfs, caplog ): @@ -577,13 +579,13 @@ def init(self, paths): As it is replaced with a mock, consumers of this fixture can set `init._cfg` if the default empty dict configuration is not appropriate. """ - with mock.patch("cloudinit.stages.util.ensure_dirs"): + with mock.patch(M_PATH + "util.ensure_dirs"): init = stages.Init() init._cfg = {} init._paths = paths yield init - @mock.patch("cloudinit.stages.util.ensure_file") + @mock.patch(M_PATH + "util.ensure_file") def test_ensure_file_not_called_if_no_log_file_configured( self, m_ensure_file, init ): @@ -621,3 +623,35 @@ def test_existing_file_permissions_are_not_modified(self, init, tmpdir): init._initialize_filesystem() assert mode == stat.S_IMODE(log_file.stat().mode) + + +class TestStagesNonRootUser: + @mock.patch(M_PATH + "util.read_conf_from_cmdline", return_value={}) + @mock.patch( + "cloudinit.util.read_conf", + side_effect=OSError(errno.EACCES, "Not allowed"), + ) + def test_init_read_cfg_no_permissions_init_cfg( + self, m_read_conf, m_read_conf_from_cmdline, caplog, capsys + ): + """If a user has not read permission to read base config then + there is no exception nor stderr output and the user is informed via + logging warnings. + + Note: This is used in cmd, therefore want to keep the invariant of + not outputing to the console and log file permission errors. + """ + init = stages.Init() + init.read_cfg() + assert init.paths + out, err = capsys.readouterr() + assert not out + assert not err + msgs = [ + "REDACTED config part /etc/cloud/cloud.cfg for non-root user", + "REDACTED config part /run/cloud-init/cloud.cfg for non-root user", + ] + for msg in msgs: + assert caplog.text.count(msg) == 1 + assert m_read_conf.call_count > 0 + assert m_read_conf_from_cmdline.call_count > 0 diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index d22d774760d..55461a07dd0 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -3,6 +3,7 @@ """Tests for cloudinit.util""" import base64 +import errno import io import json import logging @@ -13,6 +14,7 @@ import stat import tempfile from textwrap import dedent +from typing import Tuple from unittest import mock import pytest @@ -24,6 +26,7 @@ from tests.unittests.helpers import CiTestCase LOG = logging.getLogger(__name__) +M_PATH = "cloudinit.util." MOUNT_INFO = [ "68 0 8:3 / / ro,relatime shared:1 - btrfs /dev/sda1 ro,attr2,inode64", @@ -336,30 +339,91 @@ def get_hostname(self, fqdn=None, metadata_only=None): return self.hostname -class TestUtil(CiTestCase): +class TestUtil: def test_parse_mount_info_no_opts_no_arg(self): result = util.parse_mount_info("/home", MOUNT_INFO, LOG) - self.assertEqual(("/dev/sda2", "xfs", "/home"), result) + assert ("/dev/sda2", "xfs", "/home") == result def test_parse_mount_info_no_opts_arg(self): result = util.parse_mount_info("/home", MOUNT_INFO, LOG, False) - self.assertEqual(("/dev/sda2", "xfs", "/home"), result) + assert ("/dev/sda2", "xfs", "/home") == result def test_parse_mount_info_with_opts(self): result = util.parse_mount_info("/", MOUNT_INFO, LOG, True) - self.assertEqual(("/dev/sda1", "btrfs", "/", "ro,relatime"), result) + assert ("/dev/sda1", "btrfs", "/", "ro,relatime") == result - @mock.patch("cloudinit.util.get_mount_info") + @mock.patch(M_PATH + "get_mount_info") def test_mount_is_rw(self, m_mount_info): m_mount_info.return_value = ("/dev/sda1", "btrfs", "/", "rw,relatime") is_rw = util.mount_is_read_write("/") - self.assertEqual(is_rw, True) + assert is_rw is True - @mock.patch("cloudinit.util.get_mount_info") + @mock.patch(M_PATH + "get_mount_info") def test_mount_is_ro(self, m_mount_info): m_mount_info.return_value = ("/dev/sda1", "btrfs", "/", "ro,relatime") is_rw = util.mount_is_read_write("/") - self.assertEqual(is_rw, False) + assert is_rw is False + + @mock.patch( + M_PATH + "read_conf", + side_effect=(OSError(errno.EACCES, "Not allowed"), {"0": "0"}), + ) + def test_read_conf_d_no_permissions( + self, m_read_conf, caplog, capsys, tmpdir + ): + """If a user has not read permission to read a config file then + there is no exception nor stderr output and the user is informed via + logging warnings. + + Note: This is used in cmd, therefore want to keep the invariant of + not outputing to the console and log file permission errors. + """ + confs = [] + for i in range(2): + confs.append(tmpdir.join(f"conf-{i}.cfg")) + confs[i].write("{}") + assert {"0": "0"} == util.read_conf_d(tmpdir) + assert ( + caplog.text.count( + f"REDACTED config part {tmpdir}/conf-1.cfg for non-root user" + ) + == 1 + ) + assert m_read_conf.call_count == 2 + out, err = capsys.readouterr() + assert not out + assert not err + + @mock.patch( + M_PATH + "read_conf", side_effect=OSError(errno.EACCES, "Not allowed") + ) + def test_read_conf_with_confd_no_permissions( + self, m_read_conf, caplog, capsys, tmpdir + ): + """ + If a user has not read permission then there is a exception, + sys output is empty and the user is informed via logging warnings. + + Note: This is used in cmd, therefore want to keep the invariant of + not outputing to the console and log file permission errors. + """ + conf_fn = tmpdir.join("conf.cfg") + with pytest.raises(OSError) as exc_info: + util.read_conf_with_confd(conf_fn) + assert exc_info.value.errno == errno.EACCES + assert exc_info.value.strerror == "Not allowed" + assert ( + caplog.text.count( + f"REDACTED config part {conf_fn} for non-root user" + ) + == 1 + ) + assert m_read_conf.call_count == 1 + out, err = capsys.readouterr() + assert not out + assert not err + + # read_conf_with_confd class TestSymlink(CiTestCase): @@ -412,9 +476,9 @@ def test_sym_link_create_dangling(self): class TestUptime(CiTestCase): - @mock.patch("cloudinit.util.boottime") - @mock.patch("cloudinit.util.os.path.exists") - @mock.patch("cloudinit.util.time.time") + @mock.patch(M_PATH + "boottime") + @mock.patch(M_PATH + "os.path.exists") + @mock.patch(M_PATH + "time.time") def test_uptime_non_linux_path(self, m_time, m_exists, m_boottime): boottime = 1000.0 uptime = 10.0 @@ -688,7 +752,7 @@ def redhat_release_exists(self, path): if path == "/etc/redhat-release": return 1 - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_distro_quoted_name(self, m_os_release, m_path_exists): """Verify we get the correct name if the os-release file has the distro name in quotes""" @@ -697,7 +761,7 @@ def test_get_linux_distro_quoted_name(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("sles", "12.3", platform.machine()), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_distro_bare_name(self, m_os_release, m_path_exists): """Verify we get the correct name if the os-release file does not have the distro name in quotes""" @@ -708,7 +772,7 @@ def test_get_linux_distro_bare_name(self, m_os_release, m_path_exists): @mock.patch("platform.system") @mock.patch("platform.release") - @mock.patch("cloudinit.util._parse_redhat_release") + @mock.patch(M_PATH + "_parse_redhat_release") def test_get_linux_freebsd( self, m_parse_redhat_release, @@ -725,7 +789,7 @@ def test_get_linux_freebsd( dist = util.get_linux_distro() self.assertEqual(("freebsd", "12.0-RELEASE-p10", ""), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_centos6(self, m_os_release, m_path_exists): """Verify we get the correct name and release name on CentOS 6.""" m_os_release.return_value = REDHAT_RELEASE_CENTOS_6 @@ -733,7 +797,7 @@ def test_get_linux_centos6(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("centos", "6.10", "Final"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_centos7_redhat_release(self, m_os_release, m_exists): """Verify the correct release info on CentOS 7 without os-release.""" m_os_release.return_value = REDHAT_RELEASE_CENTOS_7 @@ -741,7 +805,7 @@ def test_get_linux_centos7_redhat_release(self, m_os_release, m_exists): dist = util.get_linux_distro() self.assertEqual(("centos", "7.5.1804", "Core"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_redhat7_osrelease(self, m_os_release, m_path_exists): """Verify redhat 7 read from os-release.""" m_os_release.return_value = OS_RELEASE_REDHAT_7 @@ -749,7 +813,7 @@ def test_get_linux_redhat7_osrelease(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("redhat", "7.5", "Maipo"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_redhat7_rhrelease(self, m_os_release, m_path_exists): """Verify redhat 7 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_REDHAT_7 @@ -757,7 +821,7 @@ def test_get_linux_redhat7_rhrelease(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("redhat", "7.5", "Maipo"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_redhat6_rhrelease(self, m_os_release, m_path_exists): """Verify redhat 6 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_REDHAT_6 @@ -765,7 +829,7 @@ def test_get_linux_redhat6_rhrelease(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("redhat", "6.10", "Santiago"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_copr_centos(self, m_os_release, m_path_exists): """Verify we get the correct name and release name on COPR CentOS.""" m_os_release.return_value = OS_RELEASE_CENTOS @@ -773,7 +837,7 @@ def test_get_linux_copr_centos(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("centos", "7", "Core"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_almalinux8_rhrelease(self, m_os_release, m_path_exists): """Verify almalinux 8 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_ALMALINUX_8 @@ -781,7 +845,7 @@ def test_get_linux_almalinux8_rhrelease(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("almalinux", "8.3", "Purple Manul"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_almalinux8_osrelease(self, m_os_release, m_path_exists): """Verify almalinux 8 read from os-release.""" m_os_release.return_value = OS_RELEASE_ALMALINUX_8 @@ -789,7 +853,7 @@ def test_get_linux_almalinux8_osrelease(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("almalinux", "8.3", "Purple Manul"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_eurolinux7_rhrelease(self, m_os_release, m_path_exists): """Verify eurolinux 7 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_EUROLINUX_7 @@ -797,7 +861,7 @@ def test_get_linux_eurolinux7_rhrelease(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("eurolinux", "7.9", "Minsk"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_eurolinux7_osrelease(self, m_os_release, m_path_exists): """Verify eurolinux 7 read from os-release.""" m_os_release.return_value = OS_RELEASE_EUROLINUX_7 @@ -805,7 +869,7 @@ def test_get_linux_eurolinux7_osrelease(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("eurolinux", "7.9", "Minsk"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_eurolinux8_rhrelease(self, m_os_release, m_path_exists): """Verify eurolinux 8 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_EUROLINUX_8 @@ -813,7 +877,7 @@ def test_get_linux_eurolinux8_rhrelease(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("eurolinux", "8.4", "Vaduz"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_eurolinux8_osrelease(self, m_os_release, m_path_exists): """Verify eurolinux 8 read from os-release.""" m_os_release.return_value = OS_RELEASE_EUROLINUX_8 @@ -821,7 +885,7 @@ def test_get_linux_eurolinux8_osrelease(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("eurolinux", "8.4", "Vaduz"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_miraclelinux8_rhrelease( self, m_os_release, m_path_exists ): @@ -831,7 +895,7 @@ def test_get_linux_miraclelinux8_rhrelease( dist = util.get_linux_distro() self.assertEqual(("miracle", "8.4", "Peony"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_miraclelinux8_osrelease( self, m_os_release, m_path_exists ): @@ -841,7 +905,7 @@ def test_get_linux_miraclelinux8_osrelease( dist = util.get_linux_distro() self.assertEqual(("miraclelinux", "8", "Peony"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_rocky8_rhrelease(self, m_os_release, m_path_exists): """Verify rocky linux 8 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_ROCKY_8 @@ -849,7 +913,7 @@ def test_get_linux_rocky8_rhrelease(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("rocky", "8.3", "Green Obsidian"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_rocky8_osrelease(self, m_os_release, m_path_exists): """Verify rocky linux 8 read from os-release.""" m_os_release.return_value = OS_RELEASE_ROCKY_8 @@ -857,7 +921,7 @@ def test_get_linux_rocky8_osrelease(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("rocky", "8.3", "Green Obsidian"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_virtuozzo8_rhrelease(self, m_os_release, m_path_exists): """Verify virtuozzo linux 8 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_VIRTUOZZO_8 @@ -865,7 +929,7 @@ def test_get_linux_virtuozzo8_rhrelease(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("virtuozzo", "8", "Virtuozzo Linux"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_virtuozzo8_osrelease(self, m_os_release, m_path_exists): """Verify virtuozzo linux 8 read from os-release.""" m_os_release.return_value = OS_RELEASE_VIRTUOZZO_8 @@ -873,7 +937,7 @@ def test_get_linux_virtuozzo8_osrelease(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("virtuozzo", "8", "Virtuozzo Linux"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_cloud8_rhrelease(self, m_os_release, m_path_exists): """Verify cloudlinux 8 read from redhat-release.""" m_os_release.return_value = REDHAT_RELEASE_CLOUDLINUX_8 @@ -881,7 +945,7 @@ def test_get_linux_cloud8_rhrelease(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("cloudlinux", "8.4", "Valery Rozhdestvensky"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_cloud8_osrelease(self, m_os_release, m_path_exists): """Verify cloudlinux 8 read from os-release.""" m_os_release.return_value = OS_RELEASE_CLOUDLINUX_8 @@ -889,7 +953,7 @@ def test_get_linux_cloud8_osrelease(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("cloudlinux", "8.4", "Valery Rozhdestvensky"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_debian(self, m_os_release, m_path_exists): """Verify we get the correct name and release name on Debian.""" m_os_release.return_value = OS_RELEASE_DEBIAN @@ -897,7 +961,7 @@ def test_get_linux_debian(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("debian", "9", "stretch"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_openeuler(self, m_os_release, m_path_exists): """Verify get the correct name and release name on Openeuler.""" m_os_release.return_value = OS_RELEASE_OPENEULER_20 @@ -905,7 +969,7 @@ def test_get_linux_openeuler(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("openEuler", "20.03", "LTS-SP2"), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_opensuse(self, m_os_release, m_path_exists): """Verify we get the correct name and machine arch on openSUSE prior to openSUSE Leap 15. @@ -915,7 +979,7 @@ def test_get_linux_opensuse(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("opensuse", "42.3", platform.machine()), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_opensuse_l15(self, m_os_release, m_path_exists): """Verify we get the correct name and machine arch on openSUSE for openSUSE Leap 15.0 and later. @@ -925,7 +989,7 @@ def test_get_linux_opensuse_l15(self, m_os_release, m_path_exists): dist = util.get_linux_distro() self.assertEqual(("opensuse-leap", "15.0", platform.machine()), dist) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_opensuse_tw(self, m_os_release, m_path_exists): """Verify we get the correct name and machine arch on openSUSE for openSUSE Tumbleweed @@ -937,7 +1001,7 @@ def test_get_linux_opensuse_tw(self, m_os_release, m_path_exists): ("opensuse-tumbleweed", "20180920", platform.machine()), dist ) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_get_linux_photon_os_release(self, m_os_release, m_path_exists): """Verify we get the correct name and machine arch on PhotonOS""" m_os_release.return_value = OS_RELEASE_PHOTON @@ -1059,6 +1123,9 @@ def test_is_lxd_false_when_sock_device_absent(self, m_exists): class TestReadCcFromCmdline: + + random_string: Tuple + if hasattr(pytest, "param"): random_string = pytest.param( CiTestCase.random_string(), None, id="random_string" @@ -1182,8 +1249,8 @@ def already_mounted_device_and_mountdict(self): """Mock an already-mounted device, and yield (device, mount dict)""" device = "/dev/fake0" mountpoint = "/mnt/fake" - with mock.patch("cloudinit.util.subp.subp"): - with mock.patch("cloudinit.util.mounts") as m_mounts: + with mock.patch(M_PATH + "subp.subp"): + with mock.patch(M_PATH + "mounts") as m_mounts: mounts = {device: {"mountpoint": mountpoint}} m_mounts.return_value = mounts yield device, mounts[device] @@ -1206,9 +1273,9 @@ def already_mounted_device(self, already_mounted_device_and_mountdict): ("ufs", "ufs"), ], ) - @mock.patch("cloudinit.util.is_Linux", autospec=True) - @mock.patch("cloudinit.util.is_BSD", autospec=True) - @mock.patch("cloudinit.util.subp.subp") + @mock.patch(M_PATH + "is_Linux", autospec=True) + @mock.patch(M_PATH + "is_BSD", autospec=True) + @mock.patch(M_PATH + "subp.subp") @mock.patch("cloudinit.temp_utils.tempdir", autospec=True) def test_normalize_mtype_on_bsd( self, m_tmpdir, m_subp, m_is_BSD, m_is_Linux, mtype, expected @@ -1245,7 +1312,7 @@ def test_typeerror_raised_for_invalid_mtype(self, invalid_mtype): with pytest.raises(TypeError): util.mount_cb(mock.Mock(), mock.Mock(), mtype=invalid_mtype) - @mock.patch("cloudinit.util.subp.subp") + @mock.patch(M_PATH + "subp.subp") def test_already_mounted_does_not_mount_or_umount_anything( self, m_subp, already_mounted_device ): @@ -1281,7 +1348,7 @@ def test_already_mounted_calls_callback_with_data( ] == callback.call_args_list -@mock.patch("cloudinit.util.write_file") +@mock.patch(M_PATH + "write_file") class TestEnsureFile: """Tests for ``cloudinit.util.ensure_file``.""" @@ -1326,9 +1393,9 @@ def test_static_parameters_are_passed(self, m_write_file): assert "ab" == kwargs["omode"] -@mock.patch("cloudinit.util.grp.getgrnam") -@mock.patch("cloudinit.util.os.setgid") -@mock.patch("cloudinit.util.os.umask") +@mock.patch(M_PATH + "grp.getgrnam") +@mock.patch(M_PATH + "os.setgid") +@mock.patch(M_PATH + "os.umask") class TestRedirectOutputPreexecFn: """This tests specifically the preexec_fn used in redirect_output.""" @@ -1344,7 +1411,7 @@ def preexec_fn(self, request): args = (test_string, None) elif request.param == "errfmt": args = (None, test_string) - with mock.patch("cloudinit.util.subprocess.Popen") as m_popen: + with mock.patch(M_PATH + "subprocess.Popen") as m_popen: util.redirect_output(*args) assert 1 == m_popen.call_count @@ -1778,7 +1845,7 @@ def test_raring_btrfs_root(self): expected = ("none", "tmpfs", "/run/lock") self.assertEqual(expected, util.parse_mount_info("/run/lock", lines)) - @mock.patch("cloudinit.util.os") + @mock.patch(M_PATH + "os") @mock.patch("cloudinit.subp.subp") def test_get_device_info_from_zpool(self, zpool_output, m_os): # mock /dev/zfs exists @@ -1794,7 +1861,7 @@ def test_get_device_info_from_zpool(self, zpool_output, m_os): self.assertIsNotNone(ret) m_os.path.exists.assert_called_with("/dev/zfs") - @mock.patch("cloudinit.util.os") + @mock.patch(M_PATH + "os") def test_get_device_info_from_zpool_no_dev_zfs(self, m_os): # mock /dev/zfs missing m_os.path.exists.return_value = False @@ -1802,7 +1869,7 @@ def test_get_device_info_from_zpool_no_dev_zfs(self, m_os): ret = util.get_device_info_from_zpool("vmzroot") self.assertIsNone(ret) - @mock.patch("cloudinit.util.os") + @mock.patch(M_PATH + "os") @mock.patch("cloudinit.subp.subp") def test_get_device_info_from_zpool_handles_no_zpool(self, m_sub, m_os): """Handle case where there is no zpool command""" @@ -1812,7 +1879,7 @@ def test_get_device_info_from_zpool_handles_no_zpool(self, m_sub, m_os): ret = util.get_device_info_from_zpool("vmzroot") self.assertIsNone(ret) - @mock.patch("cloudinit.util.os") + @mock.patch(M_PATH + "os") @mock.patch("cloudinit.subp.subp") def test_get_device_info_from_zpool_on_error(self, zpool_output, m_os): # mock /dev/zfs exists @@ -1879,7 +1946,7 @@ def test_is_x86_unmatched_types(self): util.is_x86(arch), 'Expected not is_x86 for arch "%s"' % arch ) - @mock.patch("cloudinit.util.os.uname") + @mock.patch(M_PATH + "os.uname") def test_is_x86_calls_uname_for_architecture(self, m_uname): """is_x86 returns True if platform from uname matches.""" m_uname.return_value = [0, 1, 2, 3, "x86_64"] @@ -1987,7 +2054,7 @@ def test_logs_dont_go_to_stdout_if_fallback_to_stdout_is_false(self): self.assertEqual("", self.stdout.getvalue()) @mock.patch( - "cloudinit.util.write_to_console", + M_PATH + "write_to_console", mock.Mock(side_effect=OSError("Failed to write to console")), ) def test_logs_go_to_stdout_if_writing_to_console_fails_and_fallback_true( @@ -2001,7 +2068,7 @@ def test_logs_go_to_stdout_if_writing_to_console_fails_and_fallback_true( ) @mock.patch( - "cloudinit.util.write_to_console", + M_PATH + "write_to_console", mock.Mock(side_effect=OSError("Failed to write to console")), ) def test_logs_go_nowhere_if_writing_to_console_fails_and_fallback_false( @@ -2210,7 +2277,7 @@ def test_id_in_os_release(self): self.reRoot(root_d) self.assertTrue(util.system_is_snappy()) - @mock.patch("cloudinit.util.get_cmdline") + @mock.patch(M_PATH + "get_cmdline") def test_bad_content_in_os_release_no_effect(self, m_cmdline): """malformed os-release should not raise exception.""" m_cmdline.return_value = "root=/dev/sda" @@ -2220,7 +2287,7 @@ def test_bad_content_in_os_release_no_effect(self, m_cmdline): self.reRoot() self.assertFalse(util.system_is_snappy()) - @mock.patch("cloudinit.util.get_cmdline") + @mock.patch(M_PATH + "get_cmdline") def test_snap_core_in_cmdline_is_snappy(self, m_cmdline): """The string snap_core= in kernel cmdline indicates snappy.""" cmdline = ( @@ -2233,7 +2300,7 @@ def test_snap_core_in_cmdline_is_snappy(self, m_cmdline): self.assertTrue(util.system_is_snappy()) self.assertTrue(m_cmdline.call_count > 0) - @mock.patch("cloudinit.util.get_cmdline") + @mock.patch(M_PATH + "get_cmdline") def test_nothing_found_is_not_snappy(self, m_cmdline): """If no positive identification, then not snappy.""" m_cmdline.return_value = "root=/dev/sda" @@ -2241,7 +2308,7 @@ def test_nothing_found_is_not_snappy(self, m_cmdline): self.assertFalse(util.system_is_snappy()) self.assertTrue(m_cmdline.call_count > 0) - @mock.patch("cloudinit.util.get_cmdline") + @mock.patch(M_PATH + "get_cmdline") def test_channel_ini_with_snappy_is_snappy(self, m_cmdline): """A Channel.ini file with 'ubuntu-core' indicates snappy.""" m_cmdline.return_value = "root=/dev/sda" @@ -2251,7 +2318,7 @@ def test_channel_ini_with_snappy_is_snappy(self, m_cmdline): self.reRoot(root_d) self.assertTrue(util.system_is_snappy()) - @mock.patch("cloudinit.util.get_cmdline") + @mock.patch(M_PATH + "get_cmdline") def test_system_image_config_dir_is_snappy(self, m_cmdline): """Existence of /etc/system-image/config.d indicates snappy.""" m_cmdline.return_value = "root=/dev/sda" @@ -2296,7 +2363,7 @@ def _val_decoded(self, blob, encoding="utf-8", errors="replace"): # return the value portion of key=val decoded. return blob.split(b"=", 1)[1].decode(encoding, errors) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_non_utf8_in_environment(self, m_load_file): """env may have non utf-8 decodable content.""" content = self.null.join( @@ -2315,7 +2382,7 @@ def test_non_utf8_in_environment(self, m_load_file): ) self.assertEqual(1, m_load_file.call_count) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_encoding_none_returns_bytes(self, m_load_file): """encoding none returns bytes.""" lines = (self.bootflag, self.simple1, self.simple2, self.mixed) @@ -2328,7 +2395,7 @@ def test_encoding_none_returns_bytes(self, m_load_file): ) self.assertEqual(1, m_load_file.call_count) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_all_utf8_encoded(self, m_load_file): """common path where only utf-8 decodable content.""" content = self.null.join((self.simple1, self.simple2)) @@ -2338,7 +2405,7 @@ def test_all_utf8_encoded(self, m_load_file): ) self.assertEqual(1, m_load_file.call_count) - @mock.patch("cloudinit.util.load_file") + @mock.patch(M_PATH + "load_file") def test_non_existing_file_returns_empty_dict(self, m_load_file): """as implemented, a non-existing pid returns empty dict. This is how it was originally implemented.""" From 77c3477c6d88cb16f45c9d2ad63d45a57246608b Mon Sep 17 00:00:00 2001 From: Alberto Contreras Date: Wed, 11 May 2022 14:32:00 +0200 Subject: [PATCH 5/8] Fix function docstr. --- cloudinit/cmd/devel/__init__.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/cloudinit/cmd/devel/__init__.py b/cloudinit/cmd/devel/__init__.py index 55eda957be8..9a8f2ebdaf6 100755 --- a/cloudinit/cmd/devel/__init__.py +++ b/cloudinit/cmd/devel/__init__.py @@ -18,10 +18,7 @@ def addLogHandlerCLI(logger, log_level): def read_cfg_paths() -> Paths: - """Return a Paths object based on the system configuration on disk. - - It handles file permission errors. - """ + """Return a Paths object based on the system configuration on disk.""" init = Init(ds_deps=[]) init.read_cfg() return init.paths From f1ff067838e79a9357a0046b0aaf61f7984d30e9 Mon Sep 17 00:00:00 2001 From: Alberto Contreras Date: Tue, 17 May 2022 10:44:12 +0200 Subject: [PATCH 6/8] Simplify file permission error handling. - Remove error handling from stages.read_runtime_config as /run/cloud-init/cloud.cfg is controlled by us. - Rework read_conf_with_confd to work even if cfgfile is not accessible by reading the confd directory only. - Remove error handling from stages.fetch_base_config as none of the functions raise. - Same in render.handle_args. - Adapt unit tests. --- cloudinit/cmd/devel/render.py | 5 +-- cloudinit/stages.py | 45 ++++++++---------------- cloudinit/util.py | 16 +++++---- tests/unittests/cmd/devel/test_render.py | 25 ------------- tests/unittests/test_stages.py | 13 +++---- tests/unittests/test_util.py | 36 ++++++++++++++----- 6 files changed, 57 insertions(+), 83 deletions(-) diff --git a/cloudinit/cmd/devel/render.py b/cloudinit/cmd/devel/render.py index cfbcce7167e..62b432d20fa 100755 --- a/cloudinit/cmd/devel/render.py +++ b/cloudinit/cmd/devel/render.py @@ -63,10 +63,7 @@ def handle_args(name, args): if args.instance_data: instance_data_fn = args.instance_data else: - try: - paths = read_cfg_paths() - except (IOError, OSError): - return 1 + paths = read_cfg_paths() uid = os.getuid() redacted_data_fn = os.path.join(paths.run_dir, INSTANCE_JSON_FILE) if uid == 0: diff --git a/cloudinit/stages.py b/cloudinit/stages.py index 02811befe2a..27af605511d 100644 --- a/cloudinit/stages.py +++ b/cloudinit/stages.py @@ -5,12 +5,10 @@ # This file is part of cloud-init. See LICENSE file for license information. import copy -import errno import os import pickle import sys from collections import namedtuple -from functools import partial from typing import Dict, Iterable, List, Optional, Set from cloudinit import cloud, distros, handlers, helpers, importer @@ -956,38 +954,23 @@ def should_run_on_boot_event(): def read_runtime_config(): - try: - return util.read_conf(RUN_CLOUD_CONFIG) - except OSError as e: - if e.errno == errno.EACCES: - LOG.warning( - "REDACTED config part %s for non-root user", - RUN_CLOUD_CONFIG, - ) - raise + return util.read_conf(RUN_CLOUD_CONFIG) def fetch_base_config(): - config_loaders = [ - # builtin config, hardcoded in settings.py. - util.get_builtin_cfg, - # Anything in your conf.d or 'default' cloud.cfg location. - partial(util.read_conf_with_confd, CLOUD_CONFIG), - # runtime config. I.e., /run/cloud-init/cloud.cfg - read_runtime_config, - # Kernel/cmdline parameters override system config - util.read_conf_from_cmdline, - ] - configs = [] - for config_loader in config_loaders: - try: - configs.append(config_loader()) - except OSError as e: - if e.errno == errno.EACCES: - pass # Already logged in `config_loaders`. - else: - raise - return util.mergemanydict(configs, reverse=True) + return util.mergemanydict( + [ + # builtin config, hardcoded in settings.py. + util.get_builtin_cfg(), + # Anything in your conf.d or 'default' cloud.cfg location. + util.read_conf_with_confd(CLOUD_CONFIG), + # runtime config. I.e., /run/cloud-init/cloud.cfg + read_runtime_config(), + # Kernel/cmdline parameters override system config + util.read_conf_from_cmdline(), + ], + reverse=True, + ) def _pkl_store(obj, fname): diff --git a/cloudinit/util.py b/cloudinit/util.py index a8ce3768029..2639478af95 100644 --- a/cloudinit/util.py +++ b/cloudinit/util.py @@ -32,6 +32,7 @@ import sys import time from base64 import b64decode, b64encode +from collections import deque from errno import EACCES, ENOENT from functools import lru_cache from typing import List @@ -1023,12 +1024,15 @@ def read_conf_d(confd): def read_conf_with_confd(cfgfile): + cfgs = deque() + cfg: dict = {} try: cfg = read_conf(cfgfile) except OSError as e: if e.errno == EACCES: LOG.warning("REDACTED config part %s for non-root user", cfgfile) - raise + else: + cfgs.append(cfg) confd = False if "conf_d" in cfg: @@ -1044,12 +1048,12 @@ def read_conf_with_confd(cfgfile): elif os.path.isdir("%s.d" % cfgfile): confd = "%s.d" % cfgfile - if not confd or not os.path.isdir(confd): - return cfg + if confd and os.path.isdir(confd): + # Conf.d settings override input configuration + confd_cfg = read_conf_d(confd) + cfgs.appendleft(confd_cfg) - # Conf.d settings override input configuration - confd_cfg = read_conf_d(confd) - return mergemanydict([confd_cfg, cfg]) + return mergemanydict(cfgs) def read_conf_from_cmdline(cmdline=None): diff --git a/tests/unittests/cmd/devel/test_render.py b/tests/unittests/cmd/devel/test_render.py index 4d8b79de79a..3bc5032f346 100644 --- a/tests/unittests/cmd/devel/test_render.py +++ b/tests/unittests/cmd/devel/test_render.py @@ -1,11 +1,8 @@ # This file is part of cloud-init. See LICENSE file for license information. -import errno from collections import namedtuple from io import StringIO -import pytest - from cloudinit.cmd.devel import render from cloudinit.helpers import Paths from cloudinit.sources import INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE @@ -141,27 +138,5 @@ def test_handle_args_warns_and_gives_up_on_invalid_jinja_operation( ' "my_var"?' % user_data ) in caplog.text - @pytest.mark.parametrize( - "exception", - [ - (OSError(errno.EACCES, "Not allowed"),), - (OSError(errno.ENOENT, "Not allowed"),), - (IOError,), - ], - ) - def test_handle_args_no_read_permission_init_config( - self, exception, capsys - ): - """render.handle_args exists with 1 and no sys-output.""" - args = self.Args(user_data=None, instance_data=None, debug=False) - with mock.patch( - M_PATH + "read_cfg_paths", side_effect=exception - ) as m_read_cfg_paths: - assert 1 == render.handle_args("anyname", args) - assert m_read_cfg_paths.call_count == 1 - out, err = capsys.readouterr() - assert not out - assert not err - # vi: ts=4 expandtab diff --git a/tests/unittests/test_stages.py b/tests/unittests/test_stages.py index cf85141a01d..5cd6cf13e11 100644 --- a/tests/unittests/test_stages.py +++ b/tests/unittests/test_stages.py @@ -629,7 +629,7 @@ class TestStagesNonRootUser: @mock.patch(M_PATH + "util.read_conf_from_cmdline", return_value={}) @mock.patch( "cloudinit.util.read_conf", - side_effect=OSError(errno.EACCES, "Not allowed"), + side_effect=(OSError(errno.EACCES, "Not allowed"), {}), ) def test_init_read_cfg_no_permissions_init_cfg( self, m_read_conf, m_read_conf_from_cmdline, caplog, capsys @@ -647,11 +647,8 @@ def test_init_read_cfg_no_permissions_init_cfg( out, err = capsys.readouterr() assert not out assert not err - msgs = [ - "REDACTED config part /etc/cloud/cloud.cfg for non-root user", - "REDACTED config part /run/cloud-init/cloud.cfg for non-root user", - ] - for msg in msgs: - assert caplog.text.count(msg) == 1 - assert m_read_conf.call_count > 0 + msg = "REDACTED config part /etc/cloud/cloud.cfg for non-root user" + assert caplog.text.count(msg) == 1 + assert m_read_conf.call_count > 1 + assert mock.call("/etc/cloud/cloud.cfg") in m_read_conf.call_args_list assert m_read_conf_from_cmdline.call_count > 0 diff --git a/tests/unittests/test_util.py b/tests/unittests/test_util.py index 55461a07dd0..bcb637871e4 100644 --- a/tests/unittests/test_util.py +++ b/tests/unittests/test_util.py @@ -13,6 +13,7 @@ import shutil import stat import tempfile +from collections import deque from textwrap import dedent from typing import Tuple from unittest import mock @@ -394,24 +395,40 @@ def test_read_conf_d_no_permissions( assert not out assert not err + @pytest.mark.parametrize( + "create_confd,expected_call", + [ + (False, mock.call(deque())), + (True, mock.call(deque([{"my_config": "foo"}]))), + ], + ) + @mock.patch(M_PATH + "mergemanydict") + @mock.patch(M_PATH + "read_conf_d", return_value={"my_config": "foo"}) @mock.patch( M_PATH + "read_conf", side_effect=OSError(errno.EACCES, "Not allowed") ) def test_read_conf_with_confd_no_permissions( - self, m_read_conf, caplog, capsys, tmpdir + self, + m_read_conf, + m_read_confd, + m_mergemanydict, + create_confd, + expected_call, + caplog, + capsys, + tmpdir, ): - """ - If a user has not read permission then there is a exception, + """Read a conf file without permission. + sys output is empty and the user is informed via logging warnings. Note: This is used in cmd, therefore want to keep the invariant of not outputing to the console and log file permission errors. """ conf_fn = tmpdir.join("conf.cfg") - with pytest.raises(OSError) as exc_info: - util.read_conf_with_confd(conf_fn) - assert exc_info.value.errno == errno.EACCES - assert exc_info.value.strerror == "Not allowed" + if create_confd: + confd_fn = tmpdir.mkdir("conf.cfg.d") + util.read_conf_with_confd(conf_fn) assert ( caplog.text.count( f"REDACTED config part {conf_fn} for non-root user" @@ -422,8 +439,9 @@ def test_read_conf_with_confd_no_permissions( out, err = capsys.readouterr() assert not out assert not err - - # read_conf_with_confd + if create_confd: + assert [mock.call(confd_fn)] == m_read_confd.call_args_list + assert [expected_call] == m_mergemanydict.call_args_list class TestSymlink(CiTestCase): From 338ad91f08f66294bb16f28cc7a65b92a6563aae Mon Sep 17 00:00:00 2001 From: Alberto Contreras Date: Tue, 17 May 2022 11:02:12 +0200 Subject: [PATCH 7/8] Remove error handling in status.handle_status_args Do not except unhandled exceptions. --- cloudinit/cmd/status.py | 5 +---- tests/unittests/cmd/test_status.py | 33 ------------------------------ 2 files changed, 1 insertion(+), 37 deletions(-) diff --git a/cloudinit/cmd/status.py b/cloudinit/cmd/status.py index 1b32d473678..1c7c209be5b 100755 --- a/cloudinit/cmd/status.py +++ b/cloudinit/cmd/status.py @@ -68,10 +68,7 @@ def get_parser(parser=None): def handle_status_args(name, args) -> int: """Handle calls to 'cloud-init status' as a subcommand.""" # Read configured paths - try: - paths = read_cfg_paths() - except (IOError, OSError): - return 1 + paths = read_cfg_paths() status, status_detail, time = get_status_details(paths) if args.wait: while status in (UXAppStatus.NOT_RUN, UXAppStatus.RUNNING): diff --git a/tests/unittests/cmd/test_status.py b/tests/unittests/cmd/test_status.py index 2a613aee222..e9169a55d53 100644 --- a/tests/unittests/cmd/test_status.py +++ b/tests/unittests/cmd/test_status.py @@ -1,6 +1,5 @@ # This file is part of cloud-init. See LICENSE file for license information. -import errno import os from collections import namedtuple from io import StringIO @@ -523,37 +522,5 @@ def test_status_main(self, m_read_cfg_paths, config: Config): assert e.value.code == 0 assert m_stdout.getvalue() == "status: running\n" - @pytest.mark.parametrize( - "exception", - [ - (OSError(errno.EACCES, "Not allowed"),), - (OSError(errno.ENOENT, "Not allowed"),), - (IOError,), - ], - ) - def test_handle_args_no_read_permission_init_config( - self, exception, capsys - ): - """status.handle_status_args exists with 1 and no sys-output.""" - cmdargs = MyArgs(long=False, wait=True) - with mock.patch( - M_PATH + "read_cfg_paths", - side_effect=exception, - ) as m_read_cfg_paths: - assert 1 == wrap_and_call( - M_NAME, - { - "sleep": {"side_effect": lambda *_: None}, - "_is_cloudinit_disabled": (False, ""), - }, - status.handle_status_args, - "ignored", - cmdargs, - ) - assert m_read_cfg_paths.call_count == 1 - out, err = capsys.readouterr() - assert not out - assert not err - # vi: ts=4 expandtab syntax=python From e0f9b786251a0457d545ca1abb110c82d3071e74 Mon Sep 17 00:00:00 2001 From: Alberto Contreras Date: Tue, 17 May 2022 11:27:57 +0200 Subject: [PATCH 8/8] Remove unneeded test. --- tests/unittests/test_stages.py | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/tests/unittests/test_stages.py b/tests/unittests/test_stages.py index 5cd6cf13e11..9fa2e6299b5 100644 --- a/tests/unittests/test_stages.py +++ b/tests/unittests/test_stages.py @@ -1,7 +1,6 @@ # This file is part of cloud-init. See LICENSE file for license information. """Tests related to cloudinit.stages module.""" -import errno import os import stat @@ -623,32 +622,3 @@ def test_existing_file_permissions_are_not_modified(self, init, tmpdir): init._initialize_filesystem() assert mode == stat.S_IMODE(log_file.stat().mode) - - -class TestStagesNonRootUser: - @mock.patch(M_PATH + "util.read_conf_from_cmdline", return_value={}) - @mock.patch( - "cloudinit.util.read_conf", - side_effect=(OSError(errno.EACCES, "Not allowed"), {}), - ) - def test_init_read_cfg_no_permissions_init_cfg( - self, m_read_conf, m_read_conf_from_cmdline, caplog, capsys - ): - """If a user has not read permission to read base config then - there is no exception nor stderr output and the user is informed via - logging warnings. - - Note: This is used in cmd, therefore want to keep the invariant of - not outputing to the console and log file permission errors. - """ - init = stages.Init() - init.read_cfg() - assert init.paths - out, err = capsys.readouterr() - assert not out - assert not err - msg = "REDACTED config part /etc/cloud/cloud.cfg for non-root user" - assert caplog.text.count(msg) == 1 - assert m_read_conf.call_count > 1 - assert mock.call("/etc/cloud/cloud.cfg") in m_read_conf.call_args_list - assert m_read_conf_from_cmdline.call_count > 0