From 2bc11dc34fba7976907bdd18e73798155a35f5f4 Mon Sep 17 00:00:00 2001 From: "d.kovalenko" Date: Sun, 4 Jan 2026 23:43:23 +0300 Subject: [PATCH 1/2] PostgresNode::is_started is read-only and does not used internally PostgresNode::is_started provides information about a manually started node. It is "set" in start method and is reset in stop, kill methods. These methods do not verify this flag. So we cannot stop and start our node twice. An updated implementation of kill method raises InvalidOperationException when our node is not running. --- src/node.py | 69 +++++++--- src/raise_error.py | 15 +++ tests/test_raise_error.py | 51 ++++++-- tests/test_testgres_common.py | 228 +++++++++++++++++++++++++++++++++- tests/test_testgres_local.py | 4 +- 5 files changed, 335 insertions(+), 32 deletions(-) diff --git a/src/node.py b/src/node.py index 915074c..640eb41 100644 --- a/src/node.py +++ b/src/node.py @@ -160,11 +160,14 @@ class PostgresNode(object): # a max number of node start attempts _C_MAX_START_ATEMPTS = 5 + _C_PM_PID__IS_NOT_DETECTED = -1 + _name: typing.Optional[str] _port: typing.Optional[int] _should_free_port: bool _os_ops: OsOperations _port_manager: typing.Optional[PortManager] + _manually_started_pm_pid: typing.Optional[int] def __init__(self, name=None, @@ -251,7 +254,7 @@ def __init__(self, self.pg_log_name = self.pg_log_file # Node state - self.is_started = False + self._manually_started_pm_pid = None def __enter__(self): return self @@ -380,6 +383,14 @@ def pid(self): assert type(x.pid) == int # noqa: E721 return x.pid + @property + def is_started(self) -> bool: + if self._manually_started_pm_pid is None: + return False + + assert type(self._manually_started_pm_pid) == int # noqa: E721 + return True + @property def auxiliary_pids(self): """ @@ -993,9 +1004,6 @@ def start(self, params=[], wait=True, exec_env=None): assert exec_env is None or type(exec_env) == dict # noqa: E721 assert __class__._C_MAX_START_ATEMPTS > 1 - if self.is_started: - return self - if self._port is None: raise InvalidOperationException("Can't start PostgresNode. Port is not defined.") @@ -1014,8 +1022,11 @@ def LOCAL__start_node(): if error and 'does not exist' in error: raise Exception(error) - def LOCAL__raise_cannot_start_node(from_exception, msg): - assert isinstance(from_exception, Exception) + def LOCAL__raise_cannot_start_node( + from_exception: typing.Optional[Exception], + msg: str + ): + assert from_exception is None or isinstance(from_exception, Exception) assert type(msg) == str # noqa: E721 files = self._collect_special_files() raise_from(StartNodeException(msg, files), from_exception) @@ -1074,7 +1085,17 @@ def LOCAL__raise_cannot_start_node__std(from_exception): continue break self._maybe_start_logger() - self.is_started = True + + if not wait: + # Postmaster process is starting in background + self._manually_started_pm_pid = __class__._C_PM_PID__IS_NOT_DETECTED + else: + self._manually_started_pm_pid = self._get_node_state().pid + if self._manually_started_pm_pid is None: + LOCAL__raise_cannot_start_node(None, "Cannot detect postmaster pid.") + + assert type(self._manually_started_pm_pid) == int # noqa: E721 + return self def stop(self, params=[], wait=True): @@ -1088,9 +1109,6 @@ def stop(self, params=[], wait=True): Returns: This instance of :class:`.PostgresNode`. """ - if not self.is_started: - return self - _params = [ self._get_bin_path("pg_ctl"), "-D", self.data_dir, @@ -1100,8 +1118,9 @@ def stop(self, params=[], wait=True): execute_utility2(self.os_ops, _params, self.utils_log_file) + self._manually_started_pm_pid = None + self._maybe_stop_logger() - self.is_started = False return self def kill(self, someone=None): @@ -1112,14 +1131,26 @@ def kill(self, someone=None): someone: A key to the auxiliary process in the auxiliary_pids dictionary. If None, the main PostgreSQL node process will be killed. Defaults to None. """ - if self.is_started: - assert isinstance(self._os_ops, OsOperations) - sig = signal.SIGKILL if os.name != 'nt' else signal.SIGBREAK - if someone is None: - self._os_ops.kill(self.pid, sig) - else: - self._os_ops.kill(self.auxiliary_pids[someone][0], sig) - self.is_started = False + x = self._get_node_state() + assert type(x) == utils.PostgresNodeState # noqa: E721 + + if x.node_status != NodeStatus.Running: + RaiseError.node_err__cant_kill(x.node_status) + assert False + + assert x.node_status == NodeStatus.Running + assert type(x.pid) == int # noqa: E721 + sig = signal.SIGKILL if os.name != 'nt' else signal.SIGBREAK + if someone is None: + self._os_ops.kill(x.pid, sig) + self._manually_started_pm_pid = None + else: + childs = self._get_child_processes(x.pid) + for c in childs: + if c.ptype == someone: + self._os_ops.kill(c.process.pid, sig) + continue + return def restart(self, params=[]): """ diff --git a/src/raise_error.py b/src/raise_error.py index 3603e7d..919cd35 100644 --- a/src/raise_error.py +++ b/src/raise_error.py @@ -45,6 +45,21 @@ def node_err__cant_enumerate_child_processes( raise InvalidOperationException(msg) + @staticmethod + def node_err__cant_kill( + node_status: NodeStatus + ): + assert type(node_status) == NodeStatus # noqa: E721 + + msg = "Can't kill server process. {}.".format( + __class__._map_node_status_to_reason( + node_status, + None, + ) + ) + + raise InvalidOperationException(msg) + @staticmethod def _map_node_status_to_reason( node_status: NodeStatus, diff --git a/tests/test_raise_error.py b/tests/test_raise_error.py index 10416e3..5548199 100644 --- a/tests/test_raise_error.py +++ b/tests/test_raise_error.py @@ -7,7 +7,7 @@ class TestRaiseError: - class tagData001__NodeErr_CantEnumerateChildProcesses: + class tagTestData001: node_status: NodeStatus expected_msg: str @@ -29,12 +29,12 @@ def sign(self) -> str: msg = "status: {}".format(self.node_status) return msg - sm_Data001: typing.List[tagData001__NodeErr_CantEnumerateChildProcesses] = [ - tagData001__NodeErr_CantEnumerateChildProcesses( + sm_Data001: typing.List[tagTestData001] = [ + tagTestData001( NodeStatus.Uninitialized, "Can't enumerate node child processes. Node is not initialized.", ), - tagData001__NodeErr_CantEnumerateChildProcesses( + tagTestData001( NodeStatus.Stopped, "Can't enumerate node child processes. Node is not running.", ), @@ -44,16 +44,16 @@ def sign(self) -> str: params=sm_Data001, ids=[x.sign for x in sm_Data001], ) - def data001(self, request: pytest.FixtureRequest) -> tagData001__NodeErr_CantEnumerateChildProcesses: + def data001(self, request: pytest.FixtureRequest) -> tagTestData001: assert isinstance(request, pytest.FixtureRequest) - assert type(request.param).__name__ == "tagData001__NodeErr_CantEnumerateChildProcesses" + assert type(request.param).__name__ == "tagTestData001" return request.param def test_001__node_err__cant_enumerate_child_processes( self, - data001: tagData001__NodeErr_CantEnumerateChildProcesses, + data001: tagTestData001, ): - assert type(data001) == __class__.tagData001__NodeErr_CantEnumerateChildProcesses # noqa: E721 + assert type(data001) == __class__.tagTestData001 # noqa: E721 with pytest.raises(expected_exception=InvalidOperationException) as x: RaiseError.node_err__cant_enumerate_child_processes( @@ -63,3 +63,38 @@ def test_001__node_err__cant_enumerate_child_processes( assert x is not None assert str(x.value) == data001.expected_msg return + + sm_Data002: typing.List[tagTestData001] = [ + tagTestData001( + NodeStatus.Uninitialized, + "Can't kill server process. Node is not initialized.", + ), + tagTestData001( + NodeStatus.Stopped, + "Can't kill server process. Node is not running.", + ), + ] + + @pytest.fixture( + params=sm_Data002, + ids=[x.sign for x in sm_Data002], + ) + def data002(self, request: pytest.FixtureRequest) -> tagTestData001: + assert isinstance(request, pytest.FixtureRequest) + assert type(request.param).__name__ == "tagTestData001" + return request.param + + def test_002__node_err__cant_kill( + self, + data002: tagTestData001, + ): + assert type(data002) == __class__.tagTestData001 # noqa: E721 + + with pytest.raises(expected_exception=InvalidOperationException) as x: + RaiseError.node_err__cant_kill( + data002.node_status + ) + + assert x is not None + assert str(x.value) == data002.expected_msg + return diff --git a/tests/test_testgres_common.py b/tests/test_testgres_common.py index c83264f..69d07b0 100644 --- a/tests/test_testgres_common.py +++ b/tests/test_testgres_common.py @@ -13,6 +13,7 @@ from src.utils import get_pg_version2 from src.utils import file_tail from src.utils import get_bin_path2 +from src.utils import execute_utility2 from src import ProcessType from src import NodeStatus from src import IsolationLevel @@ -46,6 +47,7 @@ import subprocess import typing import types +import psutil @contextmanager @@ -205,10 +207,63 @@ def test_node_exit(self, node_svc: PostgresNodeService): def test_double_start(self, node_svc: PostgresNodeService): assert isinstance(node_svc, PostgresNodeService) - with __class__.helper__get_node(node_svc).init().start() as node: - # can't start node more than once + with __class__.helper__get_node(node_svc) as node: + node.init() + assert not node.is_started + node.start() + assert node.is_started + + with pytest.raises(expected_exception=StartNodeException) as x: + # can't start node more than once + node.start() + + assert x is not None + assert type(x.value) == StartNodeException # noqa: E721 + assert type(x.value.message) == str # noqa: E721 + + assert x.value.message == "Cannot start node" + + assert node.is_started + + return + + def test_start__manually_stop__start_again(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + + with __class__.helper__get_node(node_svc) as node: + node.init() + assert not node.is_started + + logging.info("Start node") + node.start() + assert node.is_started + assert node.status() == NodeStatus.Running + + logging.info("Stop node manually via pg_ctl") + stop_cmd = [ + node.os_ops.build_path(node.bin_dir, "pg_ctl"), + "stop", + "-D", + node.data_dir, + ] + + execute_utility2( + node.os_ops, + stop_cmd, + node.utils_log_file + ) + + assert node.is_started + assert node.status() == NodeStatus.Stopped + + logging.info("Start node again") node.start() - assert (node.is_started) + assert node.is_started + assert node.status() == NodeStatus.Running + + assert not node.is_started + assert node.status() == NodeStatus.Uninitialized + return def test_uninitialized_start(self, node_svc: PostgresNodeService): assert isinstance(node_svc, PostgresNodeService) @@ -236,6 +291,28 @@ def test_restart(self, node_svc: PostgresNodeService): node.append_conf('pg_hba.conf', 'DUMMY') node.restart() + def test_double_stop(self, node_svc: PostgresNodeService): + assert isinstance(node_svc, PostgresNodeService) + + with __class__.helper__get_node(node_svc) as node: + node.init() + assert not node.is_started + node.start() + assert node.is_started + node.stop() + assert not node.is_started + + with pytest.raises(expected_exception=Exception) as x: + # can't start node more than once + node.stop() + + assert x is not None + assert "Is server running?" in str(x.value) + + assert not node.is_started + + return + def test_reload(self, node_svc: PostgresNodeService): assert isinstance(node_svc, PostgresNodeService) @@ -295,6 +372,145 @@ def test_status(self, node_svc: PostgresNodeService): assert (node.pid == 0) assert (node.status() == NodeStatus.Uninitialized) + def test_kill__is_not_initialized( + self, + node_svc: PostgresNodeService + ): + assert isinstance(node_svc, PostgresNodeService) + + with __class__.helper__get_node(node_svc) as node: + assert isinstance(node, PostgresNode) + assert (node.pid == 0) + assert (node.status() == NodeStatus.Uninitialized) + + with pytest.raises(expected_exception=InvalidOperationException) as x: + node.kill() + + assert x is not None + assert str(x.value) == "Can't kill server process. Node is not initialized." + return + + def test_kill__is_not_running( + self, + node_svc: PostgresNodeService + ): + assert isinstance(node_svc, PostgresNodeService) + + with __class__.helper__get_node(node_svc) as node: + assert isinstance(node, PostgresNode) + assert (node.pid == 0) + assert (node.status() == NodeStatus.Uninitialized) + + node.init() + + try: + with pytest.raises(expected_exception=InvalidOperationException) as x: + node.kill() + + assert x is not None + assert str(x.value) == "Can't kill server process. Node is not running." + finally: + try: + node.cleanup(release_resources=True) + except Exception as e: + logging.error("Exception ({}): {}".format( + type(e).__name__, + e, + )) + return + + def test_kill__ok( + self, + node_svc: PostgresNodeService + ): + assert isinstance(node_svc, PostgresNodeService) + + with __class__.helper__get_node(node_svc) as node: + assert isinstance(node, PostgresNode) + assert (node.pid == 0) + assert (node.status() == NodeStatus.Uninitialized) + + node.init() + assert not node.is_started + node.slow_start() + assert node.is_started + node.kill() + assert not node.is_started + + attempt = 0 + + while True: + if attempt == 60: + raise RuntimeError("Node is not stopped.") + + attempt += 1 + + if attempt > 1: + time.sleep(1) + + s = node.status() + + logging.info("Node status is {}".format(s.name)) + + if s == NodeStatus.Running: + continue + + assert s == NodeStatus.Stopped + break + return + + def test_kill_backgroud_writer__ok( + self, + node_svc: PostgresNodeService + ): + assert isinstance(node_svc, PostgresNodeService) + + with __class__.helper__get_node(node_svc) as node: + assert isinstance(node, PostgresNode) + assert (node.pid == 0) + assert (node.status() == NodeStatus.Uninitialized) + + node.init() + assert not node.is_started + node.slow_start() + assert node.is_started + node_pid = node.pid + assert type(node_pid) == int # noqa: E721 + aux_pids = node.auxiliary_pids + assert type(aux_pids) == dict # noqa: E721 + assert ProcessType.BackgroundWriter in aux_pids + bw_pids = aux_pids[ProcessType.BackgroundWriter] + assert type(bw_pids) == list # noqa: E721 + assert len(bw_pids) == 1 + bw_pid = bw_pids[0] + assert type(bw_pid) == int # noqa: E721 + node.kill(ProcessType.BackgroundWriter) + assert node.is_started + + attempt = 0 + + while True: + if attempt == 60: + raise RuntimeError("Node is not stopped.") + + attempt += 1 + + if attempt > 1: + time.sleep(1) + + try: + psutil.Process(bw_pid) + except psutil.NoSuchProcess: + logging.info("Process is not found") + break + + logging.info("Process is still alive.") + continue + + assert node.is_started + assert node.pid == node_pid + return + def test_child_processes__is_not_initialized( self, node_svc: PostgresNodeService @@ -1544,17 +1760,21 @@ def test_port_conflict(self, node_svc: PostgresNodeService): assert node2._should_free_port assert node2.port == node1.port + node2.init() + assert node2.status() == NodeStatus.Stopped + with pytest.raises( expected_exception=StartNodeException, match=re.escape("Cannot start node after multiple attempts.") ): - node2.init().start() + node2.start() assert node2.port == node1.port assert node2._should_free_port assert proxy.m_DummyPortCurrentUsage == 1 assert proxy.m_DummyPortTotalUsage == C_COUNT_OF_BAD_PORT_USAGE assert not node2.is_started + assert node2.status() == NodeStatus.Stopped # node2 must release our dummyPort (node1.port) assert (proxy.m_DummyPortCurrentUsage == 0) diff --git a/tests/test_testgres_local.py b/tests/test_testgres_local.py index 6018188..c318f68 100644 --- a/tests/test_testgres_local.py +++ b/tests/test_testgres_local.py @@ -315,11 +315,13 @@ def test_port_conflict(self): assert (node2._should_free_port) assert (node2.port == node1.port) + node2.init() + with pytest.raises( expected_exception=StartNodeException, match=re.escape("Cannot start node after multiple attempts.") ): - node2.init().start() + node2.start() assert (node2.port == node1.port) assert (node2._should_free_port) From 224d094f59d32cd1001af7f40045cce0b4c103d8 Mon Sep 17 00:00:00 2001 From: "d.kovalenko" Date: Thu, 8 Jan 2026 01:58:25 +0300 Subject: [PATCH 2/2] PostgresNode::kill is updated (assert) --- src/node.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/node.py b/src/node.py index c10dc28..b651c87 100644 --- a/src/node.py +++ b/src/node.py @@ -1149,6 +1149,7 @@ def kill(self, someone=None): else: childs = self._get_child_processes(x.pid) for c in childs: + assert type(c) == ProcessProxy # noqa: E721 if c.ptype == someone: self._os_ops.kill(c.process.pid, sig) continue