-
Notifications
You must be signed in to change notification settings - Fork 0
Closed
Description
Summary
Implement worker failure detection and automatic recovery for both transient and durable workflows in PyWorkflow with Celery.
Problem
When a Celery worker crashes mid-execution:
- Workflow stays in
RUNNINGstatus indefinitely (orphaned) - No distinction between application failures (bugs) and infrastructure failures (worker crash)
- Current config (
task_reject_on_worker_lost=True) requeues the task, but no workflow state update occurs
Proposed Solution
1. New Event Type: WORKFLOW_INTERRUPTED
Introduce a new event type to distinguish infrastructure failures from application errors:
class EventType(str, Enum):
WORKFLOW_INTERRUPTED = "workflow.interrupted"Event data:
{
"reason": "worker_lost" | "timeout" | "signal",
"worker_id": str,
"last_event_sequence": int,
"error": str | None,
"recoverable": bool
}2. New RunStatus: INTERRUPTED
class RunStatus(str, Enum):
INTERRUPTED = "interrupted" # Recoverable infrastructure failure3. Recovery Configuration
@workflow(
durable=True,
recover_on_worker_loss=True, # Default: True for durable, False for transient
max_recovery_attempts=3,
)
async def my_workflow():
...4. Behavior by Mode
Transient workflows (durable=False):
- Default:
recover_on_worker_loss=False - On worker loss: Mark as FAILED (no state to recover from)
- If
recover_on_worker_loss=True: Reschedule from scratch
Durable workflows (durable=True):
- Default:
recover_on_worker_loss=True - On worker loss: Record
WORKFLOW_INTERRUPTEDevent, status → INTERRUPTED - Reschedule task → replay events → continue from last checkpoint
- Track recovery attempts to prevent infinite loops
5. Recovery Flow
Worker crashes during workflow execution
↓
Celery detects worker loss (task_reject_on_worker_lost=True)
↓
on_failure() callback triggered
↓
Check workflow config (recover_on_worker_loss flag)
↓
If durable & recover_on_worker_loss=True:
→ Record WORKFLOW_INTERRUPTED event
→ Increment recovery_attempts in WorkflowRun
→ If attempts < max_recovery_attempts:
→ Reschedule task
→ New worker picks up
→ Replay events to last checkpoint
→ Continue execution
→ Else:
→ Mark FAILED (exceeded recovery attempts)
Why New Event Type vs Reusing WORKFLOW_FAILED
| Aspect | WORKFLOW_FAILED | WORKFLOW_INTERRUPTED |
|---|---|---|
| Cause | Application error | Infrastructure failure |
| Default action | Stop (need code fix) | Auto-retry/resume |
| Who investigates | Developers | Ops team |
| Recovery | Manual rerun | Automatic |
Implementation Status
- Add
WORKFLOW_INTERRUPTEDevent type and helper function - Add
RunStatus.INTERRUPTEDstatus - Add
recovery_attemptsfield to WorkflowRun schema - Add
recover_on_worker_lossandmax_recovery_attemptsconfig options - Implement
on_failure()callback in Celery WorkflowTask - Implement recovery scheduling logic
- Update replay mechanism to handle WORKFLOW_INTERRUPTED
- Write unit tests (18 tests added)
- Write integration tests (12 tests added)
- Update documentation
Files Modified
pyworkflow/engine/events.py- AddWORKFLOW_INTERRUPTEDevent typepyworkflow/storage/schemas.py- AddINTERRUPTEDstatus,recovery_attemptsfieldpyworkflow/core/workflow.py- Add new decorator parameterspyworkflow/celery/tasks.py- Implement failure callbacks and recovery logicpyworkflow/engine/replay.py- HandleWORKFLOW_INTERRUPTEDin replaypyworkflow/config.py- Add recovery configuration optionspyworkflow/storage/base.py- Addupdate_run_recovery_attemptsmethodpyworkflow/storage/file.py- Implementupdate_run_recovery_attemptspyworkflow/storage/memory.py- Implementupdate_run_recovery_attempts
Test Coverage
- Unit tests: 18 new tests covering event creation, status transitions, config parsing
- Integration tests: 12 new tests covering recovery flow, event replay, status transitions
- All tests passing: 98 total tests
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels