From 173f2f8837e5fa2c2aef6e5e2195df3061c91065 Mon Sep 17 00:00:00 2001 From: philip okoiokio Date: Fri, 7 Mar 2025 13:17:44 +0100 Subject: [PATCH 1/2] coroutine additions and exits handle --- hatchet_sdk/worker/worker.py | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/hatchet_sdk/worker/worker.py b/hatchet_sdk/worker/worker.py index b6ec1531..4d802cbd 100644 --- a/hatchet_sdk/worker/worker.py +++ b/hatchet_sdk/worker/worker.py @@ -29,6 +29,7 @@ from hatchet_sdk.worker.action_listener_process import worker_action_listener_process from hatchet_sdk.worker.runner.run_loop_manager import WorkerActionRunLoopManager from hatchet_sdk.workflow import WorkflowInterface +from asyncio.exceptions import CancelledError T = TypeVar("T") @@ -124,7 +125,7 @@ def register_workflow(self, workflow: TWorkflow) -> None: sys.exit(1) def create_action_function( - action_func: Callable[..., T] + action_func: Callable[..., T], ) -> Callable[[Context], T]: def action_function(context: Context) -> T: return action_func(workflow, context) @@ -340,7 +341,7 @@ async def exit_gracefully(self) -> None: logger.debug(f"gracefully stopping worker: {self.name}") if self.killing: - return self.exit_forcefully() + return await self.exit_forcefully() self.killing = True @@ -357,20 +358,31 @@ async def exit_gracefully(self) -> None: logger.info("👋") - def exit_forcefully(self) -> None: + async def exit_forcefully(self) -> None: self.killing = True logger.debug(f"forcefully stopping worker: {self.name}") + try: + await self.close() - self.close() + if self.action_listener_process: + self.action_listener_process.kill() # Forcefully kill the process - if self.action_listener_process: - self.action_listener_process.kill() # Forcefully kill the process + logger.info("👋") + # sys.exit( + # 1 + # ) # Exit immediately TODO - should we exit with 1 here, there may be other workers to cleanup - logger.info("👋") - sys.exit( - 1 - ) # Exit immediately TODO - should we exit with 1 here, there may be other workers to cleanup + except CancelledError: + logger.warning("Shutdown process was cancelled, ensuring cleanup...") + raise # Allow proper propagation of cancellation + + except Exception as e: + logger.error(f"Unexpected error during shutdown: {e}") + + finally: + logger.info("Worker cleanup finished, allowing normal exit.") + os._exit(1) def register_on_worker(callable: HatchetCallable[T], worker: Worker) -> None: From d800917c2d67de494b2509d191b10c983ad54440 Mon Sep 17 00:00:00 2001 From: philip okoiokio Date: Sat, 8 Mar 2025 09:56:55 +0100 Subject: [PATCH 2/2] Added the changes from code review --- hatchet_sdk/worker/worker.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/hatchet_sdk/worker/worker.py b/hatchet_sdk/worker/worker.py index 4d802cbd..babd5428 100644 --- a/hatchet_sdk/worker/worker.py +++ b/hatchet_sdk/worker/worker.py @@ -31,6 +31,7 @@ from hatchet_sdk.workflow import WorkflowInterface from asyncio.exceptions import CancelledError + T = TypeVar("T") @@ -313,8 +314,8 @@ async def _check_listener_health(self) -> None: ## Cleanup methods def _setup_signal_handlers(self) -> None: - signal.signal(signal.SIGTERM, self._handle_exit_signal) - signal.signal(signal.SIGINT, self._handle_exit_signal) + signal.signal(signal.SIGTERM, self._handle_force_quit_signal) + signal.signal(signal.SIGINT, self._handle_force_quit_signal) signal.signal(signal.SIGQUIT, self._handle_force_quit_signal) def _handle_exit_signal(self, signum: int, frame: FrameType | None) -> None: @@ -324,7 +325,7 @@ def _handle_exit_signal(self, signum: int, frame: FrameType | None) -> None: def _handle_force_quit_signal(self, signum: int, frame: FrameType | None) -> None: logger.info("received SIGQUIT...") - self.exit_forcefully() + self.loop.create_task(self.exit_forcefully()) async def close(self) -> None: logger.info(f"closing worker '{self.name}'...") @@ -369,9 +370,6 @@ async def exit_forcefully(self) -> None: self.action_listener_process.kill() # Forcefully kill the process logger.info("👋") - # sys.exit( - # 1 - # ) # Exit immediately TODO - should we exit with 1 here, there may be other workers to cleanup except CancelledError: logger.warning("Shutdown process was cancelled, ensuring cleanup...") @@ -382,6 +380,9 @@ async def exit_forcefully(self) -> None: finally: logger.info("Worker cleanup finished, allowing normal exit.") + # sys.exit( + # 1 + # ) # Exit immediately TODO - should we exit with 1 here, there may be other workers to cleanup os._exit(1)