From f5171e66937f638ec0024192b295f88779ef1f7a Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Fri, 21 Mar 2025 13:12:32 -0400 Subject: [PATCH 1/3] feat: pause assignment on listener shutdown --- hatchet_sdk/clients/rest_client.py | 9 +++++++ hatchet_sdk/worker/action_listener_process.py | 25 +++++++++++++------ 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/hatchet_sdk/clients/rest_client.py b/hatchet_sdk/clients/rest_client.py index f6458e5a..06594e19 100644 --- a/hatchet_sdk/clients/rest_client.py +++ b/hatchet_sdk/clients/rest_client.py @@ -9,6 +9,7 @@ from hatchet_sdk.clients.rest.api.event_api import EventApi from hatchet_sdk.clients.rest.api.log_api import LogApi from hatchet_sdk.clients.rest.api.step_run_api import StepRunApi +from hatchet_sdk.clients.rest.api.worker_api import WorkerApi from hatchet_sdk.clients.rest.api.workflow_api import WorkflowApi from hatchet_sdk.clients.rest.api.workflow_run_api import WorkflowRunApi from hatchet_sdk.clients.rest.api.workflow_runs_api import WorkflowRunsApi @@ -83,6 +84,7 @@ def __init__(self, host: str, api_key: str, tenant_id: str): self._step_run_api = None self._event_api = None self._log_api = None + self._worker_api: WorkerApi | None = None @property def api_client(self): @@ -102,6 +104,13 @@ def workflow_run_api(self): self._workflow_run_api = WorkflowRunApi(self.api_client) return self._workflow_run_api + @property + def worker_api(self): + if self._worker_api is None: + self._worker_api = WorkerApi(self.api_client) + + return self._worker_api + @property def step_run_api(self): if self._step_run_api is None: diff --git a/hatchet_sdk/worker/action_listener_process.py b/hatchet_sdk/worker/action_listener_process.py index 08508607..539dfbd4 100644 --- a/hatchet_sdk/worker/action_listener_process.py +++ b/hatchet_sdk/worker/action_listener_process.py @@ -8,12 +8,14 @@ import grpc +from hatchet_sdk.client import Client, new_client_raw from hatchet_sdk.clients.dispatcher.action_listener import Action from hatchet_sdk.clients.dispatcher.dispatcher import ( ActionListener, GetActionListenerRequest, new_dispatcher, ) +from hatchet_sdk.clients.rest.models.update_worker_request import UpdateWorkerRequest from hatchet_sdk.contracts.dispatcher_pb2 import ( GROUP_KEY_EVENT_TYPE_STARTED, STEP_EVENT_TYPE_STARTED, @@ -40,11 +42,6 @@ class ActionEvent: "THE TIME TO START THE STEP RUN IS TOO LONG, THE MAIN THREAD MAY BE BLOCKED" ) - -def noop_handler(): - pass - - @dataclass class WorkerActionListenerProcess: name: str @@ -70,9 +67,15 @@ def __post_init__(self): if self.debug: logger.setLevel(logging.DEBUG) + self.client = new_client_raw(self.config, self.debug) + loop = asyncio.get_event_loop() - loop.add_signal_handler(signal.SIGINT, noop_handler) - loop.add_signal_handler(signal.SIGTERM, noop_handler) + loop.add_signal_handler( + signal.SIGINT, lambda: asyncio.create_task(self.pause_task_assignment()) + ) + loop.add_signal_handler( + signal.SIGTERM, lambda: asyncio.create_task(self.pause_task_assignment()) + ) loop.add_signal_handler( signal.SIGQUIT, lambda: asyncio.create_task(self.exit_gracefully()) ) @@ -249,7 +252,15 @@ async def cleanup(self): self.event_queue.put(STOP_LOOP) + async def pause_task_assignment(self) -> None: + await self.client.rest.aio.worker_api.worker_update( + worker=self.listener.worker_id, + update_worker_request=UpdateWorkerRequest(isPaused=True), + ) + async def exit_gracefully(self, skip_unregister=False): + await self.pause_task_assignment() + if self.killing: return From 5648190b16199cd193ffbcc4f35294ce6933f611 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Fri, 21 Mar 2025 13:13:52 -0400 Subject: [PATCH 2/3] fix: lint --- hatchet_sdk/worker/action_listener_process.py | 1 + 1 file changed, 1 insertion(+) diff --git a/hatchet_sdk/worker/action_listener_process.py b/hatchet_sdk/worker/action_listener_process.py index 539dfbd4..3abfcbb1 100644 --- a/hatchet_sdk/worker/action_listener_process.py +++ b/hatchet_sdk/worker/action_listener_process.py @@ -42,6 +42,7 @@ class ActionEvent: "THE TIME TO START THE STEP RUN IS TOO LONG, THE MAIN THREAD MAY BE BLOCKED" ) + @dataclass class WorkerActionListenerProcess: name: str From a56bb2df07ca2c6d85081046c520f5a5f97896c8 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Fri, 21 Mar 2025 13:14:04 -0400 Subject: [PATCH 3/3] chore: ver --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 8123f1a7..9d866b73 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "hatchet-sdk" -version = "0.47.0" +version = "0.47.1" description = "" authors = ["Alexander Belanger "] readme = "README.md"