Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions pyworkflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,21 @@
WorkflowRun,
)

# Streams (pub/sub signal pattern)
from pyworkflow.streams import (
CheckpointBackend,
Signal,
Stream,
StreamConsumer,
StreamStepContext,
emit,
get_checkpoint,
get_current_signal,
save_checkpoint,
stream_step,
stream_workflow,
)

__all__ = [
# Version
"__version__",
Expand Down Expand Up @@ -264,4 +279,16 @@
"get_logger",
"bind_workflow_context",
"bind_step_context",
# Streams
"stream_workflow",
"stream_step",
"emit",
"Signal",
"Stream",
"StreamStepContext",
"StreamConsumer",
"CheckpointBackend",
"get_current_signal",
"get_checkpoint",
"save_checkpoint",
]
140 changes: 140 additions & 0 deletions pyworkflow/engine/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ class EventType(Enum):
CHILD_WORKFLOW_FAILED = "child_workflow.failed"
CHILD_WORKFLOW_CANCELLED = "child_workflow.cancelled"

# Stream signal events
SIGNAL_WAIT_STARTED = "signal.wait_started"
SIGNAL_RECEIVED = "signal.received"
SIGNAL_PUBLISHED = "signal.published"
STREAM_STEP_STARTED = "stream_step.started"
STREAM_STEP_COMPLETED = "stream_step.completed"
CHECKPOINT_SAVED = "checkpoint.saved"
CHECKPOINT_LOADED = "checkpoint.loaded"

# Schedule events
SCHEDULE_CREATED = "schedule.created"
SCHEDULE_UPDATED = "schedule.updated"
Expand Down Expand Up @@ -936,3 +945,134 @@ def create_schedule_backfill_completed_event(
"completed_at": datetime.now(UTC).isoformat(),
},
)


# Stream signal event creation helpers


def create_signal_wait_started_event(
run_id: str,
stream_id: str,
signal_types: list[str],
wait_sequence: int = 0,
) -> Event:
"""Create a signal wait started event (stream_step waiting for signals)."""
return Event(
run_id=run_id,
type=EventType.SIGNAL_WAIT_STARTED,
data={
"stream_id": stream_id,
"signal_types": signal_types,
"wait_sequence": wait_sequence,
"token": f"stream:{stream_id}:{run_id}:{wait_sequence}",
"started_at": datetime.now(UTC).isoformat(),
},
)


def create_signal_received_event(
run_id: str,
signal_id: str,
stream_id: str,
signal_type: str,
payload: Any,
) -> Event:
"""Create a signal received event (stream_step received a signal)."""
return Event(
run_id=run_id,
type=EventType.SIGNAL_RECEIVED,
data={
"signal_id": signal_id,
"stream_id": stream_id,
"signal_type": signal_type,
"payload": payload,
"received_at": datetime.now(UTC).isoformat(),
},
)


def create_signal_published_event(
run_id: str,
signal_id: str,
stream_id: str,
signal_type: str,
) -> Event:
"""Create a signal published event (workflow/step emitted a signal)."""
return Event(
run_id=run_id,
type=EventType.SIGNAL_PUBLISHED,
data={
"signal_id": signal_id,
"stream_id": stream_id,
"signal_type": signal_type,
"published_at": datetime.now(UTC).isoformat(),
},
)


def create_stream_step_started_event(
run_id: str,
stream_id: str,
step_name: str,
signal_types: list[str],
) -> Event:
"""Create a stream step started event."""
return Event(
run_id=run_id,
type=EventType.STREAM_STEP_STARTED,
data={
"stream_id": stream_id,
"step_name": step_name,
"signal_types": signal_types,
"started_at": datetime.now(UTC).isoformat(),
},
)


def create_stream_step_completed_event(
run_id: str,
stream_id: str,
step_name: str,
reason: str | None = None,
) -> Event:
"""Create a stream step completed event."""
return Event(
run_id=run_id,
type=EventType.STREAM_STEP_COMPLETED,
data={
"stream_id": stream_id,
"step_name": step_name,
"reason": reason,
"completed_at": datetime.now(UTC).isoformat(),
},
)


def create_checkpoint_saved_event(
run_id: str,
step_run_id: str,
) -> Event:
"""Create a checkpoint saved event."""
return Event(
run_id=run_id,
type=EventType.CHECKPOINT_SAVED,
data={
"step_run_id": step_run_id,
"saved_at": datetime.now(UTC).isoformat(),
},
)


def create_checkpoint_loaded_event(
run_id: str,
step_run_id: str,
) -> Event:
"""Create a checkpoint loaded event."""
return Event(
run_id=run_id,
type=EventType.CHECKPOINT_LOADED,
data={
"step_run_id": step_run_id,
"loaded_at": datetime.now(UTC).isoformat(),
},
)
49 changes: 49 additions & 0 deletions pyworkflow/engine/replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ async def _apply_event(self, ctx: LocalContext, event: Event) -> None:
elif event.type == EventType.CANCELLATION_REQUESTED:
await self._apply_cancellation_requested(ctx, event)

elif event.type == EventType.SIGNAL_WAIT_STARTED:
await self._apply_signal_wait_started(ctx, event)

elif event.type == EventType.SIGNAL_RECEIVED:
await self._apply_signal_received(ctx, event)

# Other event types don't affect replay state
# (workflow_started, step_started, step_failed, etc. are informational)

Expand Down Expand Up @@ -253,6 +259,49 @@ async def _apply_workflow_interrupted(self, ctx: LocalContext, event: Event) ->
last_event_sequence=last_event_sequence,
)

async def _apply_signal_wait_started(self, ctx: LocalContext, event: Event) -> None:
"""Apply signal_wait_started event - mark stream step as waiting for signals."""
stream_id = event.data.get("stream_id")
signal_types = event.data.get("signal_types", [])
wait_sequence = event.data.get("wait_sequence", 0)

if stream_id:
# Store signal wait state in context for replay
if not hasattr(ctx, "_signal_waits"):
ctx._signal_waits = {}
ctx._signal_waits[f"{stream_id}:{wait_sequence}"] = {
"stream_id": stream_id,
"signal_types": signal_types,
"wait_sequence": wait_sequence,
}
logger.debug(
f"Signal wait pending: {stream_id} (seq {wait_sequence})",
run_id=ctx.run_id,
stream_id=stream_id,
)

async def _apply_signal_received(self, ctx: LocalContext, event: Event) -> None:
"""Apply signal_received event - cache the received signal data."""
signal_id = event.data.get("signal_id")
stream_id = event.data.get("stream_id")
signal_type = event.data.get("signal_type")
payload = event.data.get("payload")

if signal_id:
if not hasattr(ctx, "_received_signals"):
ctx._received_signals = {}
ctx._received_signals[signal_id] = {
"signal_id": signal_id,
"stream_id": stream_id,
"signal_type": signal_type,
"payload": payload,
}
logger.debug(
f"Signal received: {signal_id} ({signal_type})",
run_id=ctx.run_id,
signal_id=signal_id,
)

async def _apply_cancellation_requested(self, ctx: LocalContext, event: Event) -> None:
"""
Apply cancellation_requested event - mark workflow for cancellation.
Expand Down
Loading
Loading