From 06efe24914c92d4a706275eff709d773057047f3 Mon Sep 17 00:00:00 2001 From: NiveditJain Date: Thu, 4 Sep 2025 18:55:11 +0530 Subject: [PATCH 1/4] Add integration test for upserting graph in StateManager - Introduced a new test case in test_upsert_graph.py to validate the upsert_graph functionality of the StateManager. - Implemented a PrintNode class to handle input messages and verify the execution of graph nodes. - Utilized asyncio for non-blocking runtime management and ensured proper cleanup of tasks after execution. --- integration-tests/test_upsert_graph.py | 65 ++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 integration-tests/test_upsert_graph.py diff --git a/integration-tests/test_upsert_graph.py b/integration-tests/test_upsert_graph.py new file mode 100644 index 00000000..5055ad9e --- /dev/null +++ b/integration-tests/test_upsert_graph.py @@ -0,0 +1,65 @@ +import pytest +import asyncio +import threading +from pydantic import BaseModel +from exospherehost import BaseNode, Runtime, StateManager, GraphNodeModel + +@pytest.mark.asyncio +async def test_upsert_graph(running_server): + + class PrintNode(BaseNode): + class Inputs(BaseModel): + message: str + + async def execute(self): + print(self.inputs.message) # type: ignore + + + state_machine_url = running_server.base_url + runtime = Runtime( + namespace="test", + name="test", + nodes=[ + PrintNode + ], + state_manager_uri=state_machine_url, + ) + + # Use asyncio task instead of thread for proper cleanup + runtime_task = None + + try: + # Start runtime as an asyncio task (non-blocking) + runtime_task = asyncio.create_task(runtime._start()) + + # Give runtime time to initialize + await asyncio.sleep(2) + + state_manager = StateManager( + namespace="test", + state_manager_uri=state_machine_url, + ) + data = await state_manager.upsert_graph( + graph_name="test_graph", + graph_nodes=[ + GraphNodeModel( + node_name="PrintNode", + namespace="test", + identifier="node1", + inputs={ + "message": "Hello, world!", + }, + ) + ], + secrets={}, + ) + assert data is not None + + finally: + # Ensure proper cleanup of the runtime task + if runtime_task and not runtime_task.done(): + runtime_task.cancel() + try: + await runtime_task + except asyncio.CancelledError: + pass # Expected when cancelling \ No newline at end of file From 14cceb049bd6b9d876e480092fa2b2bbfa24e382 Mon Sep 17 00:00:00 2001 From: NiveditJain Date: Thu, 4 Sep 2025 18:56:59 +0530 Subject: [PATCH 2/4] Remove unused threading import from test_upsert_graph.py --- integration-tests/test_upsert_graph.py | 1 - 1 file changed, 1 deletion(-) diff --git a/integration-tests/test_upsert_graph.py b/integration-tests/test_upsert_graph.py index 5055ad9e..47b723d1 100644 --- a/integration-tests/test_upsert_graph.py +++ b/integration-tests/test_upsert_graph.py @@ -1,6 +1,5 @@ import pytest import asyncio -import threading from pydantic import BaseModel from exospherehost import BaseNode, Runtime, StateManager, GraphNodeModel From f425acaeb605aad0028e0adb15f3b041a3eba08b Mon Sep 17 00:00:00 2001 From: NiveditJain Date: Thu, 4 Sep 2025 19:17:58 +0530 Subject: [PATCH 3/4] Refactor test_upsert_graph.py to use threading for runtime management - Replaced asyncio task with a threading approach for starting the runtime, ensuring proper initialization without blocking. - Simplified the upsert_graph test case by removing unnecessary cleanup logic related to asyncio tasks. - Maintained the functionality of the test while improving the structure and readability. --- integration-tests/test_upsert_graph.py | 57 +++++++++++--------------- 1 file changed, 23 insertions(+), 34 deletions(-) diff --git a/integration-tests/test_upsert_graph.py b/integration-tests/test_upsert_graph.py index 47b723d1..375a09d4 100644 --- a/integration-tests/test_upsert_graph.py +++ b/integration-tests/test_upsert_graph.py @@ -1,5 +1,6 @@ import pytest import asyncio +import threading from pydantic import BaseModel from exospherehost import BaseNode, Runtime, StateManager, GraphNodeModel @@ -24,41 +25,29 @@ async def execute(self): state_manager_uri=state_machine_url, ) - # Use asyncio task instead of thread for proper cleanup - runtime_task = None - - try: - # Start runtime as an asyncio task (non-blocking) - runtime_task = asyncio.create_task(runtime._start()) - - # Give runtime time to initialize - await asyncio.sleep(2) + thread = threading.Thread(target=runtime.start, daemon=True) + thread.start() + + await asyncio.sleep(2) - state_manager = StateManager( + state_manager = StateManager( + namespace="test", + state_manager_uri=state_machine_url, + ) + + data = await state_manager.upsert_graph( + graph_name="test_graph", + graph_nodes=[ + GraphNodeModel( + node_name="PrintNode", namespace="test", - state_manager_uri=state_machine_url, - ) - data = await state_manager.upsert_graph( - graph_name="test_graph", - graph_nodes=[ - GraphNodeModel( - node_name="PrintNode", - namespace="test", - identifier="node1", - inputs={ - "message": "Hello, world!", - }, - ) - ], + identifier="node1", + inputs={ + "message": "Hello, world!", + }, + ) + ], secrets={}, ) - assert data is not None - - finally: - # Ensure proper cleanup of the runtime task - if runtime_task and not runtime_task.done(): - runtime_task.cancel() - try: - await runtime_task - except asyncio.CancelledError: - pass # Expected when cancelling \ No newline at end of file + + assert data is not None \ No newline at end of file From a6e9d30d12abf2af1bd16f4f2ed38a7f0fdb9fd5 Mon Sep 17 00:00:00 2001 From: NiveditJain Date: Thu, 4 Sep 2025 20:32:22 +0530 Subject: [PATCH 4/4] Add integration test for fan-out functionality in StateManager - Introduced a new test case in test_fan_out.py to validate the fan-out behavior of the StateManager. - Implemented a Node1 class to handle input messages and return a count of executions. - Utilized threading for runtime management and ensured proper initialization without blocking. - Enhanced assertions to verify the success of the graph execution and the correctness of run details. --- integration-tests/test_fan_out.py | 94 ++++++++++++++++++++++++++ integration-tests/test_upsert_graph.py | 3 +- 2 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 integration-tests/test_fan_out.py diff --git a/integration-tests/test_fan_out.py b/integration-tests/test_fan_out.py new file mode 100644 index 00000000..39d34843 --- /dev/null +++ b/integration-tests/test_fan_out.py @@ -0,0 +1,94 @@ +import pytest +import asyncio +import threading +import os +from aiohttp import ClientSession +from pydantic import BaseModel +from exospherehost import BaseNode, Runtime, StateManager, GraphNodeModel + +@pytest.mark.asyncio +async def test_upsert_graph(running_server): + + class Node1(BaseNode): + class Inputs(BaseModel): + message: str + + class Outputs(BaseModel): + count: str + + async def execute(self): + return [self.Outputs(count=str(i)) for i in range(10)] + + + state_machine_url = running_server.base_url + runtime = Runtime( + namespace="test", + name="test", + nodes=[ + Node1 + ], + state_manager_uri=state_machine_url, + ) + + thread = threading.Thread(target=runtime.start, daemon=True) + thread.start() + + await asyncio.sleep(2) + + state_manager = StateManager( + namespace="test", + state_manager_uri=state_machine_url, + ) + + data = await state_manager.upsert_graph( + graph_name="test_graph", + graph_nodes=[ + GraphNodeModel( + node_name="Node1", + namespace="test", + identifier="node1", + inputs={ + "message": "Hello, world!", + }, + ) + ], + secrets={}, + ) + + assert data is not None + assert data["validation_status"] == "VALID" + + trigger = await state_manager.trigger( + graph_name="test_graph", + inputs={}, + ) + + assert trigger is not None + assert "run_id" in trigger + run_id = trigger["run_id"] + + await asyncio.sleep(30) + + run_object = None + + async with ClientSession() as session: + async with session.get(f"{state_machine_url}/v0/namespace/test/runs/1/100", headers={"x-api-key": os.getenv("EXOSPHERE_API_KEY")}) as response: # type: ignore + assert response.status == 200 + data = await response.json() + + assert "runs" in data + + for run in data["runs"]: + if run["run_id"] == run_id: + run_object = run + break + + assert run_object is not None + assert run_object["run_id"] == run_id + assert run_object["graph_name"] == "test_graph" + assert run_object["success_count"] == 10 + assert run_object["pending_count"] == 0 + assert run_object["errored_count"] == 0 + assert run_object["retried_count"] == 0 + assert run_object["total_count"] == 10 + assert run_object["status"] == "SUCCESS" diff --git a/integration-tests/test_upsert_graph.py b/integration-tests/test_upsert_graph.py index 375a09d4..9bb552bc 100644 --- a/integration-tests/test_upsert_graph.py +++ b/integration-tests/test_upsert_graph.py @@ -50,4 +50,5 @@ async def execute(self): secrets={}, ) - assert data is not None \ No newline at end of file + assert data is not None + assert data["validation_status"] == "VALID" \ No newline at end of file