Skip to content
This repository was archived by the owner on Mar 26, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions hatchet_sdk/clients/rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from hatchet_sdk.clients.rest.api.event_api import EventApi
from hatchet_sdk.clients.rest.api.log_api import LogApi
from hatchet_sdk.clients.rest.api.step_run_api import StepRunApi
from hatchet_sdk.clients.rest.api.worker_api import WorkerApi
from hatchet_sdk.clients.rest.api.workflow_api import WorkflowApi
from hatchet_sdk.clients.rest.api.workflow_run_api import WorkflowRunApi
from hatchet_sdk.clients.rest.api.workflow_runs_api import WorkflowRunsApi
Expand Down Expand Up @@ -83,6 +84,7 @@ def __init__(self, host: str, api_key: str, tenant_id: str):
self._step_run_api = None
self._event_api = None
self._log_api = None
self._worker_api: WorkerApi | None = None

@property
def api_client(self):
Expand All @@ -102,6 +104,13 @@ def workflow_run_api(self):
self._workflow_run_api = WorkflowRunApi(self.api_client)
return self._workflow_run_api

@property
def worker_api(self):
if self._worker_api is None:
self._worker_api = WorkerApi(self.api_client)

return self._worker_api

@property
def step_run_api(self):
if self._step_run_api is None:
Expand Down
24 changes: 18 additions & 6 deletions hatchet_sdk/worker/action_listener_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@

import grpc

from hatchet_sdk.client import Client, new_client_raw
from hatchet_sdk.clients.dispatcher.action_listener import Action
from hatchet_sdk.clients.dispatcher.dispatcher import (
ActionListener,
GetActionListenerRequest,
new_dispatcher,
)
from hatchet_sdk.clients.rest.models.update_worker_request import UpdateWorkerRequest
from hatchet_sdk.contracts.dispatcher_pb2 import (
GROUP_KEY_EVENT_TYPE_STARTED,
STEP_EVENT_TYPE_STARTED,
Expand Down Expand Up @@ -41,10 +43,6 @@ class ActionEvent:
)


def noop_handler():
pass


@dataclass
class WorkerActionListenerProcess:
name: str
Expand All @@ -70,9 +68,15 @@ def __post_init__(self):
if self.debug:
logger.setLevel(logging.DEBUG)

self.client = new_client_raw(self.config, self.debug)

loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, noop_handler)
loop.add_signal_handler(signal.SIGTERM, noop_handler)
loop.add_signal_handler(
signal.SIGINT, lambda: asyncio.create_task(self.pause_task_assignment())
)
loop.add_signal_handler(
signal.SIGTERM, lambda: asyncio.create_task(self.pause_task_assignment())
)
loop.add_signal_handler(
signal.SIGQUIT, lambda: asyncio.create_task(self.exit_gracefully())
)
Expand Down Expand Up @@ -249,7 +253,15 @@ async def cleanup(self):

self.event_queue.put(STOP_LOOP)

async def pause_task_assignment(self) -> None:
await self.client.rest.aio.worker_api.worker_update(
worker=self.listener.worker_id,
update_worker_request=UpdateWorkerRequest(isPaused=True),
)

async def exit_gracefully(self, skip_unregister=False):
await self.pause_task_assignment()

if self.killing:
return

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "hatchet-sdk"
version = "0.47.0"
version = "0.47.1"
description = ""
authors = ["Alexander Belanger <alexander@hatchet.run>"]
readme = "README.md"
Expand Down