Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
c9d18d7
Add prune and re-enqueue signal functionality
NiveditJain Aug 30, 2025
e6367af
Fix typos in logging messages and comments
NiveditJain Aug 30, 2025
bd0a898
Update state-manager/app/models/signal_models.py
NiveditJain Aug 30, 2025
fc9c14c
Update state-manager/app/controller/re_queue_after_singal.py
NiveditJain Aug 30, 2025
4cc63fe
Update state-manager/app/controller/re_queue_after_singal.py
NiveditJain Aug 30, 2025
b6127ff
Update state-manager/app/controller/prune_signal.py
NiveditJain Aug 30, 2025
87870b0
Refactor re-queue after signal functionality and update state model
NiveditJain Aug 30, 2025
94d740f
Merge branch 'signals' of https://github.com/NiveditJain/exospherehos…
NiveditJain Aug 30, 2025
c689a8e
Added import for time module in re_queue_after_signal.py to support t…
NiveditJain Aug 30, 2025
55a4754
Add unit tests for prune and re-enqueue signal functionality
NiveditJain Aug 30, 2025
e5d61d8
Refactor test imports for prune and re-enqueue signal unit tests
NiveditJain Aug 30, 2025
d84b6ab
Implement prune and requeue signal functionality
NiveditJain Aug 30, 2025
ced39b9
Add tests for PruneSingal and ReQueueAfterSingal functionality
NiveditJain Aug 30, 2025
b018b03
Fix signal naming inconsistencies and enhance signal functionality
NiveditJain Aug 30, 2025
5d66297
Correct signal naming in tests and enhance exception handling
NiveditJain Aug 30, 2025
ef84f0c
Enhance validation in ReEnqueueAfterRequestModel tests
NiveditJain Aug 30, 2025
74f51a6
Add Signals documentation and update navigation in mkdocs.yml
NiveditJain Aug 30, 2025
c13b530
Update prune_signal status check to validate against QUEUED state
NiveditJain Aug 30, 2025
49e2eb4
fixed all failing tests
NiveditJain Aug 30, 2025
ea760a1
namespace check would be added as a seprate unit later to take care o…
NiveditJain Aug 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions docs/docs/exosphere/signals.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Signals

!!! beta "Beta Feature"
Signals are currently available in beta. The API and functionality may change in future releases.

Signals are a mechanism in Exosphere for controlling workflow execution flow and state management. They allow nodes to communicate with the state manager to perform specific actions like pruning states or requeuing them after a delay.

## Overview

Signals are implemented as exceptions that should be raised from within node execution. When a signal is raised, the runtime automatically handles the communication with the state manager to perform the requested action.

## Available Signals

### PruneSignal

The `PruneSignal` is used to permanently remove a state from the workflow execution. This is typically used when a node determines that the current execution path should be terminated.

#### Usage

```python
from exospherehost import PruneSignal

class MyNode(BaseNode):
class Inputs(BaseModel):
data: str

class Outputs(BaseModel):
result: str

async def execute(self, inputs: Inputs) -> Outputs:
if inputs.data == "invalid":
# Prune the state with optional data
raise PruneSignal({"reason": "invalid_data", "error": "Data validation failed"})

return self.Outputs(result="processed")
```

#### Parameters

- `data` (dict[str, Any], optional): Additional data to include with the prune operation. Defaults to an empty dictionary.

### ReQueueAfterSignal

The `ReQueueAfterSignal` is used to requeue a state for execution after a specified time delay. This is useful for implementing retry logic, scheduled tasks, or rate limiting.

#### Usage

```python
from exospherehost import ReQueueAfterSignal
from datetime import timedelta

class RetryNode(BaseNode):
class Inputs(BaseModel):
retry_count: int
data: str

class Outputs(BaseModel):
result: str

async def execute(self, inputs: Inputs) -> Outputs:
if inputs.retry_count < 3:
# Requeue after 5 minutes
raise ReQueueAfterSignal(timedelta(minutes=5))

return self.Outputs(result="completed")
```

#### Parameters

- `delay` (timedelta): The amount of time to wait before requeuing the state. Must be greater than 0.

## Important Notes

1. **Do not catch signals**: Signals are designed to bubble up to the runtime for handling. Do not catch these exceptions in your node code.

2. **Automatic handling**: The runtime automatically sends signals to the state manager when they are raised.

3. **State lifecycle**: Signals affect the state's lifecycle in the state manager:
- `PruneSignal`: Sets state status to `PRUNED`
- `ReQueueAfterSignal`: Sets state status to `CREATED` and schedules requeue

## Error Handling

If signal sending fails (e.g., network issues), the runtime will log the error and continue processing other states. The failed signal will not be retried automatically.

## Examples

### Conditional Pruning

```python
class ValidationNode(BaseNode):
class Inputs(BaseModel):
user_id: str
data: dict

async def execute(self, inputs: Inputs) -> Outputs:
if not self._validate_user(inputs.user_id):
raise PruneSignal({
"reason": "invalid_user",
"user_id": inputs.user_id,
"timestamp": datetime.now().isoformat()
})

return self.Outputs(validated=True)
```

### Polling

```python
class PollingNode(BaseNode):
class Inputs(BaseModel):
job_id: str

async def execute(self, inputs: Inputs) -> Outputs:
# Check if the job is complete
job_status = await self._check_job_status(inputs.job_id)

if job_status == "completed":
result = await self._get_job_result(inputs.job_id)
return self.Outputs(result=result)
elif job_status == "failed":
# Job failed, prune the state
raise PruneSignal({
"reason": "job_failed",
"job_id": inputs.job_id,
"poll_count": inputs.poll_count
})
else:
# Job still running, poll again in 30 seconds
raise ReQueueAfterSignal(timedelta(seconds=30))
```
2 changes: 2 additions & 0 deletions docs/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ plugins:
- exosphere/create-graph.md
- exosphere/trigger-graph.md
- exosphere/dashboard.md
- exosphere/signals.md
- exosphere/architecture.md

markdown_extensions:
Expand Down Expand Up @@ -130,4 +131,5 @@ nav:
- Create Graph: exosphere/create-graph.md
- Trigger Graph: exosphere/trigger-graph.md
- Dashboard: exosphere/dashboard.md
- Signals: exosphere/signals.md
- Architecture: exosphere/architecture.md
3 changes: 2 additions & 1 deletion python-sdk/exospherehost/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ async def execute(self, inputs: Inputs) -> Outputs:
from .runtime import Runtime
from .node.BaseNode import BaseNode
from .statemanager import StateManager, TriggerState
from .signals import PruneSignal, ReQueueAfterSignal

VERSION = __version__

__all__ = ["Runtime", "BaseNode", "StateManager", "TriggerState", "VERSION"]
__all__ = ["Runtime", "BaseNode", "StateManager", "TriggerState", "VERSION", "PruneSignal", "ReQueueAfterSignal"]
2 changes: 1 addition & 1 deletion python-sdk/exospherehost/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.0.2b1"
version = "0.0.2b2"
23 changes: 23 additions & 0 deletions python-sdk/exospherehost/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pydantic import BaseModel
from .node.BaseNode import BaseNode
from aiohttp import ClientSession
from .signals import PruneSignal, ReQueueAfterSignal

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -159,6 +160,18 @@ def _get_secrets_endpoint(self, state_id: str):
Construct the endpoint URL for getting secrets.
"""
return f"{self._state_manager_uri}/{str(self._state_manager_version)}/namespace/{self._namespace}/state/{state_id}/secrets"

def _get_prune_endpoint(self, state_id: str):
"""
Construct the endpoint URL for pruning a state.
"""
return f"{self._state_manager_uri}/{str(self._state_manager_version)}/namespace/{self._namespace}/state/{state_id}/prune"

def _get_requeue_after_endpoint(self, state_id: str):
"""
Construct the endpoint URL for requeuing a state after a timedelta.
"""
return f"{self._state_manager_uri}/{str(self._state_manager_version)}/namespace/{self._namespace}/state/{state_id}/re-enqueue-after"

async def _register(self):
"""
Expand Down Expand Up @@ -395,6 +408,16 @@ async def _worker(self, idx: int):

await self._notify_executed(state["state_id"], outputs)
logger.info(f"Notified executed state {state['state_id']} for node {node.__name__ if node else "unknown"}")

except PruneSignal as prune_signal:
logger.info(f"Pruning state {state['state_id']} for node {node.__name__ if node else "unknown"}")
await prune_signal.send(self._get_prune_endpoint(state["state_id"]), self._key) # type: ignore
logger.info(f"Pruned state {state['state_id']} for node {node.__name__ if node else "unknown"}")

except ReQueueAfterSignal as requeue_signal:
logger.info(f"Requeuing state {state['state_id']} for node {node.__name__ if node else "unknown"} after {requeue_signal.delay}")
await requeue_signal.send(self._get_requeue_after_endpoint(state["state_id"]), self._key) # type: ignore
logger.info(f"Requeued state {state['state_id']} for node {node.__name__ if node else "unknown"} after {requeue_signal.delay}")

except Exception as e:
logger.error(f"Error executing state {state['state_id']} for node {node.__name__ if node else "unknown"}: {e}")
Expand Down
71 changes: 71 additions & 0 deletions python-sdk/exospherehost/signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from typing import Any
from aiohttp import ClientSession
from datetime import timedelta

class PruneSignal(Exception):
"""
Exception used to signal that a prune operation should be performed.

Args:
data (dict[str, Any], optional): Additional data to include with the signal. Defaults to {}.

Note:
Do not catch this Exception, let it bubble up to Runtime for handling at StateManager.
"""
def __init__(self, data: dict[str, Any] = {}):
self.data = data
super().__init__(f"Prune signal received with data: {data} \n NOTE: Do not catch this Exception, let it bubble up to Runtime for handling at StateManager")

async def send(self, endpoint: str, key: str):
"""
Sends the prune signal to the specified endpoint.

Args:
endpoint (str): The URL to send the signal to.
key (str): The API key to include in the request headers.

Raises:
Exception: If the HTTP request fails (status code != 200).
"""
async with ClientSession() as session:
async with session.post(endpoint, json=self.data, headers={"x-api-key": key}) as response:
if response.status != 200:
raise Exception(f"Failed to send prune signal to {endpoint}")


class ReQueueAfterSignal(Exception):
"""
Exception used to signal that a requeue operation should be performed after a specified timedelta.

Args:
timedelta (timedelta): The amount of time to wait before requeuing.

Note:
Do not catch this Exception, let it bubble up to Runtime for handling at StateManager.
"""
def __init__(self, delay: timedelta):
self.delay = delay

if self.delay.total_seconds() <= 0:
raise Exception("Delay must be greater than 0")

super().__init__(f"ReQueueAfter signal received with timedelta: {timedelta} \n NOTE: Do not catch this Exception, let it bubble up to Runtime for handling at StateManager")

async def send(self, endpoint: str, key: str):
"""
Sends the requeue-after signal to the specified endpoint.

Args:
endpoint (str): The URL to send the signal to.
key (str): The API key to include in the request headers.

Raises:
Exception: If the HTTP request fails (status code != 200).
"""
body = {
"enqueue_after": int(self.delay.total_seconds() * 1000)
}
async with ClientSession() as session:
async with session.post(endpoint, json=body, headers={"x-api-key": key}) as response:
if response.status != 200:
raise Exception(f"Failed to send requeue after signal to {endpoint}")
2 changes: 1 addition & 1 deletion python-sdk/tests/test_package_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def test_package_all_imports():
"""Test that __all__ contains all expected exports."""
from exospherehost import __all__

expected_exports = ["Runtime", "BaseNode", "StateManager", "TriggerState", "VERSION"]
expected_exports = ["Runtime", "BaseNode", "StateManager", "TriggerState", "VERSION", "PruneSignal", "ReQueueAfterSignal"]

for export in expected_exports:
assert export in __all__, f"{export} should be in __all__"
Expand Down
Loading