From 6b7a20985a624ec65a38d071b94020fc0aac49c2 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Fri, 14 Feb 2025 14:26:38 -0500 Subject: [PATCH 1/9] feat: add sync spawn method --- hatchet_sdk/context/context.py | 41 ++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/hatchet_sdk/context/context.py b/hatchet_sdk/context/context.py index f20acd66..2584949b 100644 --- a/hatchet_sdk/context/context.py +++ b/hatchet_sdk/context/context.py @@ -403,3 +403,44 @@ def fetch_run_failures(self) -> list[dict[str, StrictStr]]: for step_run in job_run.step_runs if step_run.error and step_run.step ] + + @tenacity_retry + def spawn_workflow( + self, + workflow_name: str, + input: dict[str, Any] = {}, + key: str | None = None, + options: ChildTriggerWorkflowOptions | None = None, + ) -> WorkflowRunRef: + worker_id = self.worker.id() + trigger_options = self._prepare_workflow_options(key, options, worker_id) + + return self.admin_client.run_workflow(workflow_name, input, trigger_options) + + @tenacity_retry + def spawn_workflows( + self, child_workflow_runs: list[ChildWorkflowRunDict] + ) -> list[WorkflowRunRef]: + + if len(child_workflow_runs) == 0: + raise Exception("no child workflows to spawn") + + worker_id = self.worker.id() + + bulk_trigger_workflow_runs: list[WorkflowRunDict] = [] + for child_workflow_run in child_workflow_runs: + workflow_name = child_workflow_run["workflow_name"] + input = child_workflow_run["input"] + + key = child_workflow_run.get("key") + options = child_workflow_run.get("options", {}) + + trigger_options = self._prepare_workflow_options(key, options, worker_id) + + bulk_trigger_workflow_runs.append( + WorkflowRunDict( + workflow_name=workflow_name, input=input, options=trigger_options + ) + ) + + return self.admin_client.run_workflows(bulk_trigger_workflow_runs) From 27e5f7ac9a586e92b2c417ca26c0f1d1ac44e971 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Fri, 14 Feb 2025 14:29:21 -0500 Subject: [PATCH 2/9] feat: example --- examples/fanout_sync/test_fanout.py | 21 ++++++++++ examples/fanout_sync/trigger.py | 25 ++++++++++++ examples/fanout_sync/worker.py | 60 +++++++++++++++++++++++++++++ pyproject.toml | 1 + 4 files changed, 107 insertions(+) create mode 100644 examples/fanout_sync/test_fanout.py create mode 100644 examples/fanout_sync/trigger.py create mode 100644 examples/fanout_sync/worker.py diff --git a/examples/fanout_sync/test_fanout.py b/examples/fanout_sync/test_fanout.py new file mode 100644 index 00000000..4bdf61d9 --- /dev/null +++ b/examples/fanout_sync/test_fanout.py @@ -0,0 +1,21 @@ +import pytest + +from hatchet_sdk import Hatchet, Worker + + +# requires scope module or higher for shared event loop +@pytest.mark.asyncio(scope="session") +@pytest.mark.parametrize("worker", ["fanout"], indirect=True) +async def test_run(hatchet: Hatchet, worker: Worker) -> None: + run = hatchet.admin.run_workflow("Parent", {"n": 2}) + result = await run.result() + assert len(result["spawn"]["results"]) == 2 + + +# requires scope module or higher for shared event loop +@pytest.mark.asyncio(scope="session") +@pytest.mark.parametrize("worker", ["fanout"], indirect=True) +async def test_run2(hatchet: Hatchet, worker: Worker) -> None: + run = hatchet.admin.run_workflow("Parent", {"n": 2}) + result = await run.result() + assert len(result["spawn"]["results"]) == 2 diff --git a/examples/fanout_sync/trigger.py b/examples/fanout_sync/trigger.py new file mode 100644 index 00000000..c34d01b3 --- /dev/null +++ b/examples/fanout_sync/trigger.py @@ -0,0 +1,25 @@ +import asyncio +import base64 +import json +import os + +from dotenv import load_dotenv + +from hatchet_sdk import new_client +from hatchet_sdk.clients.admin import TriggerWorkflowOptions +from hatchet_sdk.clients.run_event_listener import StepRunEventType + + +async def main() -> None: + load_dotenv() + hatchet = new_client() + + hatchet.admin.run_workflow( + "Parent", + {"test": "test"}, + options={"additional_metadata": {"hello": "moon"}}, + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/fanout_sync/worker.py b/examples/fanout_sync/worker.py new file mode 100644 index 00000000..428414df --- /dev/null +++ b/examples/fanout_sync/worker.py @@ -0,0 +1,60 @@ +from typing import Any + +from dotenv import load_dotenv + +from hatchet_sdk import Context, Hatchet + +load_dotenv() + +hatchet = Hatchet(debug=True) + + +@hatchet.workflow(on_events=["parent:create"]) +class SyncFanoutParent: + @hatchet.step(timeout="5m") + def spawn(self, context: Context) -> dict[str, Any]: + print("spawning child") + + context.put_stream("spawning...") + results = [] + + n = context.workflow_input().get("n", 100) + + for i in range(n): + results.append( + ( + context.spawn_workflow( + "Child", + {"a": str(i)}, + key=f"child{i}", + options={"additional_metadata": {"hello": "earth"}}, + ) + ) + ) + + +@hatchet.workflow(on_events=["child:create"]) +class SyncFanoutChild: + @hatchet.step() + def process(self, context: Context) -> dict[str, str]: + a = context.workflow_input()["a"] + print(f"child process {a}") + context.put_stream("child 1...") + return {"status": "success " + a} + + @hatchet.step() + def process2(self, context: Context) -> dict[str, str]: + print("child process2") + context.put_stream("child 2...") + return {"status2": "success"} + + +def main() -> None: + worker = hatchet.worker("sync-fanout-worker", max_runs=40) + worker.register_workflow(SyncFanoutParent()) + worker.register_workflow(SyncFanoutChild()) + worker.start() + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 29cc6578..13613ab1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -111,6 +111,7 @@ explicit_package_bases = true api = "examples.api.api:main" async = "examples.async.worker:main" fanout = "examples.fanout.worker:main" +fanout_sync = "examples.fanout_sync.worker:main" cancellation = "examples.cancellation.worker:main" concurrency_limit = "examples.concurrency_limit.worker:main" concurrency_limit_rr = "examples.concurrency_limit_rr.worker:main" From 152f31ee966c20b10f39d929cc718338bc37ee61 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Fri, 14 Feb 2025 16:37:48 -0500 Subject: [PATCH 3/9] fix: sync workflow run + sync_result --- hatchet_sdk/clients/workflow_listener.py | 10 +++++++--- hatchet_sdk/utils/aio_utils.py | 4 ++-- hatchet_sdk/workflow_run.py | 16 +++++++++------- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/hatchet_sdk/clients/workflow_listener.py b/hatchet_sdk/clients/workflow_listener.py index b1131587..8bf71a3c 100644 --- a/hatchet_sdk/clients/workflow_listener.py +++ b/hatchet_sdk/clients/workflow_listener.py @@ -75,6 +75,12 @@ class PooledWorkflowRunListener: interrupter: asyncio.Task = None def __init__(self, config: ClientConfig): + try: + asyncio.get_running_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + conn = new_conn(config, True) self.client = DispatcherStub(conn) self.token = config.token @@ -260,12 +266,10 @@ async def _retry_subscribe(self): if self.curr_requester != 0: self.requests.put_nowait(self.curr_requester) - listener = self.client.SubscribeToWorkflowRuns( + return self.client.SubscribeToWorkflowRuns( self._request(), metadata=get_metadata(self.token), ) - - return listener except grpc.RpcError as e: if e.code() == grpc.StatusCode.UNAVAILABLE: retries = retries + 1 diff --git a/hatchet_sdk/utils/aio_utils.py b/hatchet_sdk/utils/aio_utils.py index 3f7ac3f3..459205f1 100644 --- a/hatchet_sdk/utils/aio_utils.py +++ b/hatchet_sdk/utils/aio_utils.py @@ -92,7 +92,7 @@ def __init__(self) -> None: self.loop = asyncio.new_event_loop() self.thread = Thread(target=self.run_loop_in_thread, args=(self.loop,)) - def __enter__(self) -> asyncio.AbstractEventLoop: + def __enter__(self, *a, **kw) -> asyncio.AbstractEventLoop: """ Starts the thread running the event loop when entering the context. @@ -102,7 +102,7 @@ def __enter__(self) -> asyncio.AbstractEventLoop: self.thread.start() return self.loop - def __exit__(self) -> None: + def __exit__(self, *a, **kw) -> None: """ Stops the event loop and joins the thread when exiting the context. """ diff --git a/hatchet_sdk/workflow_run.py b/hatchet_sdk/workflow_run.py index 51a23821..064f6741 100644 --- a/hatchet_sdk/workflow_run.py +++ b/hatchet_sdk/workflow_run.py @@ -32,16 +32,18 @@ def result(self) -> Coroutine: return self.workflow_listener.result(self.workflow_run_id) def sync_result(self) -> dict: + coro = self.workflow_listener.result(self.workflow_run_id) loop = get_active_event_loop() + if loop is None: - with EventLoopThread() as loop: - coro = self.workflow_listener.result(self.workflow_run_id) - future = asyncio.run_coroutine_threadsafe(coro, loop) - return future.result() + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(coro) + finally: + asyncio.set_event_loop(None) else: - coro = self.workflow_listener.result(self.workflow_run_id) - future = asyncio.run_coroutine_threadsafe(coro, loop) - return future.result() + return loop.run_until_complete(coro) T = TypeVar("T") From 0bd6e560bcc219be33f57e6015109bd633082c47 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Fri, 14 Feb 2025 16:37:53 -0500 Subject: [PATCH 4/9] fix: examples --- examples/fanout_sync/trigger.py | 2 +- examples/fanout_sync/worker.py | 26 ++++++++++++-------------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/examples/fanout_sync/trigger.py b/examples/fanout_sync/trigger.py index c34d01b3..8bc0d9a7 100644 --- a/examples/fanout_sync/trigger.py +++ b/examples/fanout_sync/trigger.py @@ -15,7 +15,7 @@ async def main() -> None: hatchet = new_client() hatchet.admin.run_workflow( - "Parent", + "SyncFanoutParent", {"test": "test"}, options={"additional_metadata": {"hello": "moon"}}, ) diff --git a/examples/fanout_sync/worker.py b/examples/fanout_sync/worker.py index 428414df..9a5b51a8 100644 --- a/examples/fanout_sync/worker.py +++ b/examples/fanout_sync/worker.py @@ -3,6 +3,7 @@ from dotenv import load_dotenv from hatchet_sdk import Context, Hatchet +from hatchet_sdk.workflow_run import WorkflowRunRef load_dotenv() @@ -15,16 +16,15 @@ class SyncFanoutParent: def spawn(self, context: Context) -> dict[str, Any]: print("spawning child") - context.put_stream("spawning...") - results = [] + results: list[WorkflowRunRef] = [] - n = context.workflow_input().get("n", 100) + n = context.workflow_input().get("n", 5) for i in range(n): results.append( ( context.spawn_workflow( - "Child", + "SyncFanoutChild", {"a": str(i)}, key=f"child{i}", options={"additional_metadata": {"hello": "earth"}}, @@ -32,21 +32,19 @@ def spawn(self, context: Context) -> dict[str, Any]: ) ) + results = [ + r.sync_result() + for r in results + ] + + print(f"results {results}") + @hatchet.workflow(on_events=["child:create"]) class SyncFanoutChild: @hatchet.step() def process(self, context: Context) -> dict[str, str]: - a = context.workflow_input()["a"] - print(f"child process {a}") - context.put_stream("child 1...") - return {"status": "success " + a} - - @hatchet.step() - def process2(self, context: Context) -> dict[str, str]: - print("child process2") - context.put_stream("child 2...") - return {"status2": "success"} + return {"status": "success " + context.workflow_input()["a"]} def main() -> None: From 9b970f1a8090a2315f69baef44febe4201e4a154 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Fri, 14 Feb 2025 16:39:06 -0500 Subject: [PATCH 5/9] fix: lint --- examples/fanout_sync/worker.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/examples/fanout_sync/worker.py b/examples/fanout_sync/worker.py index 9a5b51a8..793da856 100644 --- a/examples/fanout_sync/worker.py +++ b/examples/fanout_sync/worker.py @@ -16,12 +16,12 @@ class SyncFanoutParent: def spawn(self, context: Context) -> dict[str, Any]: print("spawning child") - results: list[WorkflowRunRef] = [] + runs: list[WorkflowRunRef] = [] n = context.workflow_input().get("n", 5) for i in range(n): - results.append( + runs.append( ( context.spawn_workflow( "SyncFanoutChild", @@ -32,13 +32,12 @@ def spawn(self, context: Context) -> dict[str, Any]: ) ) - results = [ - r.sync_result() - for r in results - ] + results = [r.sync_result() for r in runs] print(f"results {results}") + return {"results": results} + @hatchet.workflow(on_events=["child:create"]) class SyncFanoutChild: From 28f085cc544136c315a9a184a15f8e8ae1f67f8b Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Fri, 14 Feb 2025 16:41:22 -0500 Subject: [PATCH 6/9] feat: bulk spawn --- examples/fanout_sync/worker.py | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/examples/fanout_sync/worker.py b/examples/fanout_sync/worker.py index 793da856..a2020844 100644 --- a/examples/fanout_sync/worker.py +++ b/examples/fanout_sync/worker.py @@ -16,21 +16,19 @@ class SyncFanoutParent: def spawn(self, context: Context) -> dict[str, Any]: print("spawning child") - runs: list[WorkflowRunRef] = [] - n = context.workflow_input().get("n", 5) - for i in range(n): - runs.append( - ( - context.spawn_workflow( - "SyncFanoutChild", - {"a": str(i)}, - key=f"child{i}", - options={"additional_metadata": {"hello": "earth"}}, - ) - ) - ) + runs = context.spawn_workflows( + [ + { + "workflow_name": "SyncFanoutChild", + "input": {"a": str(i)}, + "key": f"child{i}", + "options": {"additional_metadata": {"hello": "earth"}}, + } + for i in range(n) + ] + ) results = [r.sync_result() for r in runs] From 209559acf5e2838eea332936e6626b5576adeb7a Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Fri, 14 Feb 2025 16:44:23 -0500 Subject: [PATCH 7/9] chore: ver --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 13613ab1..7fbdc77a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "hatchet-sdk" -version = "0.46.1" +version = "0.47.0" description = "" authors = ["Alexander Belanger "] readme = "README.md" From 17a05f53d5b8667f0b5fe0e613734aff95ab5218 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Fri, 14 Feb 2025 16:51:19 -0500 Subject: [PATCH 8/9] fix: test --- examples/fanout_sync/test_fanout.py | 21 --------------------- examples/fanout_sync/test_fanout_sync.py | 11 +++++++++++ 2 files changed, 11 insertions(+), 21 deletions(-) delete mode 100644 examples/fanout_sync/test_fanout.py create mode 100644 examples/fanout_sync/test_fanout_sync.py diff --git a/examples/fanout_sync/test_fanout.py b/examples/fanout_sync/test_fanout.py deleted file mode 100644 index 4bdf61d9..00000000 --- a/examples/fanout_sync/test_fanout.py +++ /dev/null @@ -1,21 +0,0 @@ -import pytest - -from hatchet_sdk import Hatchet, Worker - - -# requires scope module or higher for shared event loop -@pytest.mark.asyncio(scope="session") -@pytest.mark.parametrize("worker", ["fanout"], indirect=True) -async def test_run(hatchet: Hatchet, worker: Worker) -> None: - run = hatchet.admin.run_workflow("Parent", {"n": 2}) - result = await run.result() - assert len(result["spawn"]["results"]) == 2 - - -# requires scope module or higher for shared event loop -@pytest.mark.asyncio(scope="session") -@pytest.mark.parametrize("worker", ["fanout"], indirect=True) -async def test_run2(hatchet: Hatchet, worker: Worker) -> None: - run = hatchet.admin.run_workflow("Parent", {"n": 2}) - result = await run.result() - assert len(result["spawn"]["results"]) == 2 diff --git a/examples/fanout_sync/test_fanout_sync.py b/examples/fanout_sync/test_fanout_sync.py new file mode 100644 index 00000000..c0ef04b7 --- /dev/null +++ b/examples/fanout_sync/test_fanout_sync.py @@ -0,0 +1,11 @@ +import pytest + +from hatchet_sdk import Hatchet, Worker + + +# requires scope module or higher for shared event loop +@pytest.mark.parametrize("worker", ["fanout_sync"], indirect=True) +def test_run(hatchet: Hatchet, worker: Worker) -> None: + run = hatchet.admin.run_workflow("SyncFanoutParent", {"n": 2}) + result = run.sync_result() + assert len(result["spawn"]["results"]) == 2 From 4911afa2ac53c9beba1b5f883addb9271c5ccac7 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Fri, 14 Feb 2025 16:52:24 -0500 Subject: [PATCH 9/9] fix: lint --- examples/fanout_sync/trigger.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/examples/fanout_sync/trigger.py b/examples/fanout_sync/trigger.py index 8bc0d9a7..d5ac99b8 100644 --- a/examples/fanout_sync/trigger.py +++ b/examples/fanout_sync/trigger.py @@ -1,13 +1,8 @@ import asyncio -import base64 -import json -import os from dotenv import load_dotenv from hatchet_sdk import new_client -from hatchet_sdk.clients.admin import TriggerWorkflowOptions -from hatchet_sdk.clients.run_event_listener import StepRunEventType async def main() -> None: