-
Notifications
You must be signed in to change notification settings - Fork 34
Add workflow cancellation support for Redis queue mode #321
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Check for job cancellation before each command execution in CommandExecutor. This allows workflows running in Redis queue mode to stop at the next command boundary when the user presses Stop, rather than requiring multiple presses. https://claude.ai/code/session_013VFNJ1Sznugpony3r2Mgxt
📝 WalkthroughWalkthroughThe changes introduce a workflow cancellation mechanism by adding a Changes
Sequence DiagramsequenceDiagram
participant Task
participant CommandExecutor
participant JobState
Task->>CommandExecutor: set_cancellation_check(should_stop_func)
CommandExecutor->>CommandExecutor: Store should_stop callable
loop Command Execution Loop
CommandExecutor->>CommandExecutor: _check_cancellation()
CommandExecutor->>JobState: should_stop_func() invokes<br/>job.refresh() & check is_stopped
alt Job is Stopped
JobState-->>CommandExecutor: True
CommandExecutor->>CommandExecutor: Raise WorkflowCancelled
CommandExecutor-->>Task: WorkflowCancelled exception
Task->>Task: Log cancellation notice<br/>Cleanup resources
else Job is Running
JobState-->>CommandExecutor: False
CommandExecutor->>CommandExecutor: Execute run_command()
end
end
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/workflow/CommandExecutor.py`:
- Around line 112-116: run_multiple_commands() currently swallows exceptions
raised by run_command() running in threads, so WorkflowCancelled may be ignored;
modify run_multiple_commands() to collect exceptions from each thread (e.g.,
append exception objects to a shared list or set via a thread-safe structure)
and also store per-task success flags, then after joining all threads check the
collected exceptions and if any contain a WorkflowCancelled instance re-raise
that exception (or raise the first WorkflowCancelled) before returning; ensure
run_command(), run_multiple_commands(), and any usage of _check_cancellation()
reflect that exceptions propagate out of the parallel join logic rather than
being masked by all(results).
| WorkflowCancelled: If the workflow was cancelled by the user. | ||
| Exception: If the command execution results in any errors. | ||
| """ | ||
| # Check for cancellation before starting the command | ||
| self._check_cancellation() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Propagate WorkflowCancelled out of parallel execution
When run_command() raises inside run_multiple_commands() threads, the exception is swallowed and all(results) can still return True, so cancellations can be ignored. Consider capturing thread exceptions and re-raising after joins.
Suggested fix
- results = []
+ results = []
+ exceptions = []
lock = threading.Lock()
def run_and_track(cmd):
- success = self.run_command(cmd)
- with lock:
- results.append(success)
+ try:
+ success = self.run_command(cmd)
+ with lock:
+ results.append(success)
+ except Exception as e:
+ with lock:
+ exceptions.append(e)
@@
for thread in threads:
thread.join()
+ if exceptions:
+ for e in exceptions:
+ if isinstance(e, WorkflowCancelled):
+ raise e
+ raise exceptions[0]
+
- return all(results)
+ return len(results) == len(commands) and all(results)🤖 Prompt for AI Agents
In `@src/workflow/CommandExecutor.py` around lines 112 - 116,
run_multiple_commands() currently swallows exceptions raised by run_command()
running in threads, so WorkflowCancelled may be ignored; modify
run_multiple_commands() to collect exceptions from each thread (e.g., append
exception objects to a shared list or set via a thread-safe structure) and also
store per-task success flags, then after joining all threads check the collected
exceptions and if any contain a WorkflowCancelled instance re-raise that
exception (or raise the first WorkflowCancelled) before returning; ensure
run_command(), run_multiple_commands(), and any usage of _check_cancellation()
reflect that exceptions propagate out of the parallel join logic rather than
being masked by all(results).
Summary
This PR adds support for cancelling workflows that are running in Redis queue mode. Users can now stop long-running workflows between command executions, with proper cleanup and status reporting.
Key Changes
WorkflowCancelledexception: Custom exception to signal user-initiated workflow cancellationset_cancellation_check()method toCommandExecutorthat accepts a callable to check if a workflow should stop_check_cancellation()is called before each command execution to allow stopping workflows between commandsexecute_workflow(), set up a cancellation check that monitors the Redis job'sis_stoppedstatusWorkflowCancelledthat:cancelled: TrueflagImplementation Details
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.