From 6bd7e4ea60035ae7cee974000a601f99da81fd4d Mon Sep 17 00:00:00 2001 From: Ben Thomas Date: Tue, 29 Jul 2025 12:24:25 -0700 Subject: [PATCH 01/11] Adding draft version of workflows design doc. --- docs/design/workflows_updated.md | 636 +++++++++++++++++++++++++++++++ 1 file changed, 636 insertions(+) create mode 100644 docs/design/workflows_updated.md diff --git a/docs/design/workflows_updated.md b/docs/design/workflows_updated.md new file mode 100644 index 0000000000..9411de9c21 --- /dev/null +++ b/docs/design/workflows_updated.md @@ -0,0 +1,636 @@ +# Semantic Kernel Workflow Framework - Technical Design Document + +## Table of Contents +1. [Executive Summary](#executive-summary) +2. [Introduction](#introduction) +3. [Architecture Overview](#architecture-overview) +4. [Core Components](#core-components) +5. [Execution Model](#execution-model) +6. [API Design](#api-design) +7. [Pattern Implementation](#pattern-implementation) +8. [Event System](#event-system) +9. [State Management](#state-management) +10. [Human-in-the-Loop Support](#human-in-the-loop-support) +11. [Advanced Features](#advanced-features) +12. [Security Considerations](#security-considerations) +13. [Performance Considerations](#performance-considerations) +14. [Future Enhancements](#future-enhancements) + +## Executive Summary + +The Semantic Kernel Workflow Framework is a sophisticated orchestration system designed to manage complex multi-agent workflows with support for various execution patterns including sequential, concurrent, conditional, and human-in-the-loop scenarios. Built on a graph-based architecture using Pregel-style execution, the framework provides a flexible and extensible foundation for building AI-powered applications. + +Key features include: +- Type-safe executor-based architecture +- Asynchronous event-driven execution +- Built-in support for common patterns (sequential, fan-out/fan-in, loops) +- Human-in-the-loop capabilities +- Shared state management with thread-safe operations +- Comprehensive event streaming for observability +- Checkpointing and resumption capabilities (planned) + +## Introduction + +### Purpose +The Workflow Framework serves as a middle layer between multi-agent orchestrations and the agent runtime, providing developers with a powerful abstraction for building complex AI workflows. It addresses the need for structured, observable, and maintainable agent coordination patterns. + +### Design Goals +1. **Type Safety**: Enforce strong typing throughout the workflow pipeline +2. **Flexibility**: Support various execution patterns without framework modifications +3. **Observability**: Provide comprehensive event streaming for monitoring and debugging +4. **Extensibility**: Allow custom executors and patterns to be easily integrated +5. **Performance**: Enable concurrent execution where possible +6. **Reliability**: Support checkpointing and fault tolerance (future) + +### Target Use Cases +- Multi-agent group chats with various routing strategies +- Map-reduce style data processing pipelines +- Sequential task chains with conditional branching +- Human-in-the-loop decision workflows +- Complex orchestrations requiring state management + +## Architecture Overview + +The framework follows a graph-based architecture where: +- **Nodes** are represented by `Executor` instances +- **Edges** define the flow of data between executors +- **Messages** carry typed data through the graph +- **Events** provide observability into the execution + +``` +┌─────────────┐ Edge ┌─────────────┐ +│ Executor A │──────────────▶│ Executor B │ +└─────────────┘ └─────────────┘ + │ │ + │ ┌─────────────┐ │ + └────────▶│ Executor C │◀────┘ + └─────────────┘ +``` + +### Key Architectural Decisions + +1. **Executor-Based Design**: Each processing unit is an executor with strongly typed input/output +2. **Pregel-Style Execution**: Superstep-based execution model for predictable behavior +3. **Event-Driven Communication**: Asynchronous message passing between executors +4. **Shared State Management**: Thread-safe shared state with atomic operations + +## Core Components + +### 1. Executor (`executor.py`) + +The fundamental processing unit in the workflow system. + +```python +class Executor(Generic[T], ABC): + """Base class for all workflow executors""" + + def __init__(self, id: str | None = None): + self._id = id or str(uuid.uuid4()) + self._input_type = self._extract_type_parameter() + + @abstractmethod + async def _execute(self, data: T, ctx: ExecutorContext) -> Any: + """Execute logic to be implemented by subclasses""" + + async def execute(self, data: T, ctx: ExecutorContext) -> Any: + """Wrapper that emits events and calls _execute""" + + def can_handle(self, data: Any) -> bool: + """Type checking for incoming messages""" +``` + +**Key Features:** +- Generic type parameter for input type safety +- Automatic type extraction and validation +- Built-in event emission for observability +- Unique identifier for routing and debugging + +### 2. Edge (`_edge.py`) + +Represents directed connections between executors with optional routing conditions. + +```python +class _Edge: + """Directed edge with conditional routing support""" + + def __init__(self, source: Executor, target: Executor, + condition: Callable[[Any], bool] | None = None): + self.source = source + self.target = target + self._condition = condition + self._edge_group_ids: list[str] = [] +``` + +**Key Features:** +- Conditional routing based on message content +- Edge groups for fan-in synchronization +- Type-aware message filtering +- Support for complex routing patterns + +### 3. Workflow (`workflow.py`) + +The main orchestration container that manages execution. + +```python +class Workflow: + """Workflow container managing executors and execution""" + + def __init__(self, edges: list[_Edge], start_executor: Executor | str, + execution_context: ExecutionContext): + self._edges = edges + self._start_executor = start_executor + self._runner = _Runner(edges, shared_state, execution_context) + + async def run_stream(self, message: Any, + executor: Executor | str | None = None) -> AsyncIterable[WorkflowEvent]: + """Stream execution events as the workflow runs""" +``` + +### 4. WorkflowBuilder (`workflow.py`) + +Fluent API for constructing workflows. + +```python +class WorkflowBuilder: + """Builder pattern for workflow construction""" + + def add_edge(self, source: Executor, target: Executor, + condition: Callable[[Any], bool] | None = None) -> Self + def add_fan_out_edges(self, source: Executor, targets: list[Executor]) -> Self + def add_fan_in_edges(self, sources: list[Executor], target: Executor, + activation: Activation = Activation.WhenAll) -> Self + def add_loop(self, source: Executor, target: Executor, + condition: Callable[[Any], bool] | None = None) -> Self + def add_chain(self, executors: list[Executor]) -> Self +``` + +### 5. Runner (`_runner.py`) + +Internal component managing the Pregel-style execution. + +```python +class _Runner: + """Manages superstep-based workflow execution""" + + async def run_until_convergence(self) -> AsyncIterable[WorkflowEvent]: + """Run supersteps until no messages remain""" + + async def _run_iteration(self): + """Execute one superstep of message delivery""" +``` + +### 6. ExecutionContext (`execution_context.py`) + +Protocol defining the execution environment interface. + +```python +@runtime_checkable +class ExecutionContext(Protocol): + """Execution context for message passing and event handling""" + + async def send_message(self, source_id: str, message: Any) -> None + async def drain_messages(self) -> dict[str, list[Any]] + async def add_event(self, event: WorkflowEvent) -> None + async def drain_events(self) -> list[WorkflowEvent] +``` + +## Execution Model + +### Pregel-Style Supersteps + +The framework uses a modified Pregel execution model: + +1. **Initialization**: Start executor receives initial message +2. **Superstep Loop**: + - Collect all pending messages + - Deliver messages to target executors concurrently + - Execute all triggered executors + - Collect new messages and events +3. **Convergence**: Continue until no messages remain or max iterations reached + +### Message Delivery + +Messages are delivered according to edge configurations: +- **Direct routing**: Message flows if type matches and condition passes +- **Fan-out**: Single source sends to multiple targets +- **Fan-in**: Multiple sources synchronize at single target +- **Conditional**: Routing based on message content + +### Concurrency Model + +- Executors within a superstep run concurrently +- Messages from different sources are delivered in parallel +- Shared state access is protected by locks +- Event collection is thread-safe + +## API Design + +### Creating Executors + +```python +@output_message_types(str) +class UpperCaseExecutor(Executor[str]): + async def _execute(self, data: str, ctx: ExecutorContext) -> str: + result = data.upper() + await ctx.send_message(result) + return result +``` + +### Building Workflows + +```python +# Sequential workflow +workflow = ( + WorkflowBuilder() + .add_chain([executor_a, executor_b, executor_c]) + .set_start_executor(executor_a) + .build() +) + +# Conditional routing +workflow = ( + WorkflowBuilder() + .add_edge(router, executor_a, lambda msg: msg.type == "A") + .add_edge(router, executor_b, lambda msg: msg.type == "B") + .set_start_executor(router) + .build() +) + +# Fan-out/Fan-in pattern +workflow = ( + WorkflowBuilder() + .set_start_executor(splitter) + .add_fan_out_edges(splitter, [worker1, worker2, worker3]) + .add_fan_in_edges([worker1, worker2, worker3], aggregator, + activation=Activation.WhenAll) + .build() +) +``` + +### Running Workflows + +```python +# Stream execution with event handling +async for event in workflow.run_stream(initial_message): + if isinstance(event, ExecutorCompleteEvent): + print(f"Executor {event.executor_id} completed") + elif isinstance(event, HumanInTheLoopEvent): + # Handle human intervention request + user_input = await get_user_input() + async for event in workflow.run_stream(user_input, executor=event.executor_id): + # Process continuation events +``` + +## Pattern Implementation + +### 1. Sequential Processing + +Simple chain of executors processing data in order. + +```python +workflow = ( + WorkflowBuilder() + .add_chain([preprocessor, analyzer, formatter]) + .set_start_executor(preprocessor) + .build() +) +``` + +### 2. Round-Robin Group Chat + +Executors take turns in a predefined sequence. + +```python +class RoundRobinGroupChatManager(Executor[list[ChatMessageContent]]): + def __init__(self, members: list[str], max_rounds: int): + self._members = members + self._current_round = 0 + self._max_rounds = max_rounds + + async def _execute(self, data: list[ChatMessageContent], + ctx: ExecutorContext) -> AgentSelectionDecision | None: + if self._current_round >= self._max_rounds: + await ctx.add_event(WorkflowCompletedEvent(data=self._chat_history)) + return None + + selection = self._members[self._current_round % len(self._members)] + decision = AgentSelectionDecision(messages=data, selection=selection) + await ctx.send_message(decision) + self._current_round += 1 + return decision +``` + +### 3. Map-Reduce Pattern + +Parallel processing with aggregation. + +```python +# Split -> Map (parallel) -> Shuffle -> Reduce (parallel) -> Aggregate +workflow = ( + WorkflowBuilder() + .set_start_executor(splitter) + .add_fan_out_edges(splitter, mappers) + .add_fan_in_edges(mappers, shuffler, activation=Activation.WhenAll) + .add_fan_out_edges(shuffler, reducers) + .add_fan_in_edges(reducers, aggregator, activation=Activation.WhenAll) + .build() +) +``` + +### 4. Conditional Branching + +Dynamic routing based on message content. + +```python +workflow = ( + WorkflowBuilder() + .add_edge(classifier, high_priority_handler, + lambda msg: msg.priority == "high") + .add_edge(classifier, normal_handler, + lambda msg: msg.priority == "normal") + .add_edge(classifier, low_priority_handler, + lambda msg: msg.priority == "low") + .set_start_executor(classifier) + .build() +) +``` + +## Event System + +### Event Types + +```python +# Workflow lifecycle events +WorkflowStartedEvent # Workflow execution begins +WorkflowCompletedEvent # Workflow reaches completion + +# Executor events +ExecutorInvokeEvent # Executor starts processing +ExecutorCompleteEvent # Executor finishes processing + +# Agent-specific events +AgentRunEvent # Agent produces final response +AgentRunStreamingEvent # Agent streams partial response + +# Control flow events +HumanInTheLoopEvent # Human intervention required +``` + +### Event Handling + +Events are emitted during execution and streamed to consumers: + +```python +async for event in workflow.run_stream(message): + match event: + case ExecutorCompleteEvent(executor_id=id, data=result): + logger.info(f"Executor {id} completed with result: {result}") + case HumanInTheLoopEvent(executor_id=id): + # Pause workflow for human input + human_response = await prompt_user() + # Resume with human input + case WorkflowCompletedEvent(data=final_result): + return final_result +``` + +## State Management + +### Shared State + +Thread-safe key-value store accessible to all executors. + +```python +class _SharedState: + """Thread-safe shared state management""" + + async def set(self, key: str, value: Any) -> None + async def get(self, key: str) -> Any + async def has(self, key: str) -> bool + async def delete(self, key: str) -> None + + @asynccontextmanager + async def hold(self): + """Hold lock for multiple operations""" +``` + +### Usage in Executors + +```python +class StatefulExecutor(Executor[str]): + async def _execute(self, data: str, ctx: ExecutorContext) -> None: + # Read from shared state + counter = await ctx.get_shared_state("counter") or 0 + + # Update shared state + await ctx.set_shared_state("counter", counter + 1) + + # Atomic multi-operation update + async with ctx._shared_state.hold(): + value1 = await ctx.get_shared_state("key1") + value2 = await ctx.get_shared_state("key2") + await ctx.set_shared_state("combined", value1 + value2) +``` + +## Human-in-the-Loop Support + +### Design Approach + +The framework supports human intervention through: +1. Special HIL executors that emit `HumanInTheLoopEvent` +2. Workflow suspension while awaiting human input +3. Targeted message delivery to resume execution + +### Implementation Example + +```python +class HumanInTheLoopExecutor(Executor[list[ChatMessageContent]]): + def __init__(self): + super().__init__() + self._awaiting_input = False + + async def _execute(self, data: list[ChatMessageContent], + ctx: ExecutorContext) -> list[ChatMessageContent] | None: + if not self._awaiting_input: + # Request human input + self._awaiting_input = True + await ctx.add_event(HumanInTheLoopEvent(executor_id=self.id)) + return None + else: + # Process human response + self._awaiting_input = False + await ctx.send_message(data) + return data +``` + +### Integration Pattern + +```python +# Main execution loop with HIL support +hil_event = None +while True: + if hil_event: + # Resume at specific executor with human input + events = workflow.run_stream(human_input, executor=hil_event.executor_id) + else: + # Normal execution + events = workflow.run_stream(initial_message) + + async for event in events: + if isinstance(event, HumanInTheLoopEvent): + hil_event = event + human_input = await get_human_input() + break + elif isinstance(event, WorkflowCompletedEvent): + return event.data +``` + +## Advanced Features + +### 1. Edge Groups and Synchronization + +Fan-in edges can be grouped for synchronized delivery: + +```python +# All three workers must complete before aggregator runs +workflow = ( + WorkflowBuilder() + .add_fan_in_edges([worker1, worker2, worker3], aggregator, + activation=Activation.WhenAll) + .build() +) +``` + +### 2. Type System Integration + +The framework leverages Python's type system for safety: + +```python +def _is_instance_of(data: Any, target_type: type) -> bool: + """Runtime type checking supporting generics""" + # Handles Union, Optional, List, Dict, Tuple types + # Provides comprehensive type validation +``` + +### 3. Custom Output Type Declaration + +Executors can declare multiple output types: + +```python +@output_message_types(ProcessedData, ErrorReport, None) +class DataProcessor(Executor[RawData]): + async def _execute(self, data: RawData, ctx: ExecutorContext): + try: + result = process(data) + await ctx.send_message(ProcessedData(result)) + except Exception as e: + await ctx.send_message(ErrorReport(str(e))) +``` + +### 4. Streaming Support + +Built-in support for streaming responses: + +```python +class StreamingAgentExecutor(Executor[str]): + async def _execute(self, prompt: str, ctx: ExecutorContext): + async for chunk in self.agent.stream(prompt): + await ctx.add_event(AgentRunStreamingEvent(self.id, chunk)) + + final_response = await self.agent.get_final() + await ctx.send_message(final_response) +``` + +## Security Considerations + +### 1. Type Safety +- Strong typing prevents type confusion attacks +- Runtime type validation catches mismatched messages +- Generic type parameters enforce compile-time safety + +### 2. State Isolation +- Executors cannot directly access each other's state +- Shared state requires explicit key-based access +- No global mutable state outside controlled interfaces + +### 3. Message Validation +- All messages are validated against executor input types +- Conditional routing provides additional filtering +- Malformed messages are rejected at edge boundaries + +### 4. Resource Limits +- Maximum iteration count prevents infinite loops +- Timeout support for long-running executors (planned) +- Memory usage bounded by message queue size + +## Performance Considerations + +### 1. Concurrency +- Superstep model enables parallel executor execution +- Message delivery within superstep is concurrent +- Async/await throughout for non-blocking I/O + +### 2. Memory Efficiency +- Messages are passed by reference where possible +- Event streaming prevents memory accumulation +- Lazy evaluation of conditional edges + +### 3. Scalability +- O(E + V) complexity per superstep (edges + vertices) +- Linear scaling with number of messages +- Shared state operations are O(1) average case + +### 4. Optimization Opportunities +- Edge pre-computation for static workflows +- Message batching for high-throughput scenarios +- Executor pooling for stateless processors + +## Future Enhancements + +### 1. Checkpointing and Recovery +```python +class CheckpointProvider(Protocol): + async def save_checkpoint(self, workflow_id: str, state: WorkflowState) -> str + async def load_checkpoint(self, checkpoint_id: str) -> WorkflowState + async def list_checkpoints(self, workflow_id: str) -> list[CheckpointInfo] +``` + +### 2. Distributed Execution +- Support for executor distribution across nodes +- Message passing via message queues +- Distributed shared state with consistency guarantees + +### 3. Advanced Patterns +- Sub-workflow composition +- Dynamic executor instantiation +- Recursive workflow structures +- Time-based triggers and delays + +### 4. Observability Enhancements +- OpenTelemetry integration +- Structured logging with correlation IDs +- Performance metrics and profiling +- Visual workflow debugging tools + +### 5. Template System +```python +# Planned template system for reusable patterns +template = WorkflowTemplate("map_reduce") + .with_parameter("mapper_count", type=int, default=3) + .with_parameter("reducer_count", type=int, default=2) + .with_pattern(MapReducePattern()) + .build() + +workflow = template.instantiate(mapper_count=5, reducer_count=3) +``` + +### 6. Error Handling and Retry +- Configurable retry policies per executor +- Dead letter queues for failed messages +- Circuit breaker pattern support +- Graceful degradation strategies + +## Conclusion + +The Semantic Kernel Workflow Framework provides a robust foundation for building complex AI-powered workflows. Its type-safe, event-driven architecture combined with flexible execution patterns makes it suitable for a wide range of applications from simple sequential processing to complex multi-agent orchestrations with human oversight. + +The framework's design prioritizes developer experience through its fluent API while maintaining the flexibility needed for advanced use cases. As the framework evolves, planned enhancements around distributed execution, checkpointing, and advanced patterns will further expand its capabilities while maintaining the core principles of type safety, observability, and extensibility. \ No newline at end of file From 56f4f4364e02e03452263228cf70f3fe73c782f4 Mon Sep 17 00:00:00 2001 From: Ben Thomas Date: Wed, 30 Jul 2025 14:15:20 -0700 Subject: [PATCH 02/11] Adding message flow diagrams and fixing group chat sample. --- docs/design/workflows_updated.md | 286 ++++++++++++++++++++++++------- 1 file changed, 226 insertions(+), 60 deletions(-) diff --git a/docs/design/workflows_updated.md b/docs/design/workflows_updated.md index 9411de9c21..a5ac02ba9c 100644 --- a/docs/design/workflows_updated.md +++ b/docs/design/workflows_updated.md @@ -1,6 +1,7 @@ -# Semantic Kernel Workflow Framework - Technical Design Document +# Agent Framework Workflows - Technical Design Document ## Table of Contents + 1. [Executive Summary](#executive-summary) 2. [Introduction](#introduction) 3. [Architecture Overview](#architecture-overview) @@ -21,6 +22,7 @@ The Semantic Kernel Workflow Framework is a sophisticated orchestration system designed to manage complex multi-agent workflows with support for various execution patterns including sequential, concurrent, conditional, and human-in-the-loop scenarios. Built on a graph-based architecture using Pregel-style execution, the framework provides a flexible and extensible foundation for building AI-powered applications. Key features include: + - Type-safe executor-based architecture - Asynchronous event-driven execution - Built-in support for common patterns (sequential, fan-out/fan-in, loops) @@ -32,9 +34,11 @@ Key features include: ## Introduction ### Purpose + The Workflow Framework serves as a middle layer between multi-agent orchestrations and the agent runtime, providing developers with a powerful abstraction for building complex AI workflows. It addresses the need for structured, observable, and maintainable agent coordination patterns. ### Design Goals + 1. **Type Safety**: Enforce strong typing throughout the workflow pipeline 2. **Flexibility**: Support various execution patterns without framework modifications 3. **Observability**: Provide comprehensive event streaming for monitoring and debugging @@ -43,6 +47,7 @@ The Workflow Framework serves as a middle layer between multi-agent orchestratio 6. **Reliability**: Support checkpointing and fault tolerance (future) ### Target Use Cases + - Multi-agent group chats with various routing strategies - Map-reduce style data processing pipelines - Sequential task chains with conditional branching @@ -52,6 +57,7 @@ The Workflow Framework serves as a middle layer between multi-agent orchestratio ## Architecture Overview The framework follows a graph-based architecture where: + - **Nodes** are represented by `Executor` instances - **Edges** define the flow of data between executors - **Messages** carry typed data through the graph @@ -83,23 +89,24 @@ The fundamental processing unit in the workflow system. ```python class Executor(Generic[T], ABC): """Base class for all workflow executors""" - + def __init__(self, id: str | None = None): self._id = id or str(uuid.uuid4()) self._input_type = self._extract_type_parameter() - + @abstractmethod async def _execute(self, data: T, ctx: ExecutorContext) -> Any: """Execute logic to be implemented by subclasses""" - + async def execute(self, data: T, ctx: ExecutorContext) -> Any: """Wrapper that emits events and calls _execute""" - + def can_handle(self, data: Any) -> bool: """Type checking for incoming messages""" ``` **Key Features:** + - Generic type parameter for input type safety - Automatic type extraction and validation - Built-in event emission for observability @@ -112,8 +119,8 @@ Represents directed connections between executors with optional routing conditio ```python class _Edge: """Directed edge with conditional routing support""" - - def __init__(self, source: Executor, target: Executor, + + def __init__(self, source: Executor, target: Executor, condition: Callable[[Any], bool] | None = None): self.source = source self.target = target @@ -122,6 +129,7 @@ class _Edge: ``` **Key Features:** + - Conditional routing based on message content - Edge groups for fan-in synchronization - Type-aware message filtering @@ -134,14 +142,14 @@ The main orchestration container that manages execution. ```python class Workflow: """Workflow container managing executors and execution""" - + def __init__(self, edges: list[_Edge], start_executor: Executor | str, execution_context: ExecutionContext): self._edges = edges self._start_executor = start_executor self._runner = _Runner(edges, shared_state, execution_context) - - async def run_stream(self, message: Any, + + async def run_stream(self, message: Any, executor: Executor | str | None = None) -> AsyncIterable[WorkflowEvent]: """Stream execution events as the workflow runs""" ``` @@ -153,8 +161,8 @@ Fluent API for constructing workflows. ```python class WorkflowBuilder: """Builder pattern for workflow construction""" - - def add_edge(self, source: Executor, target: Executor, + + def add_edge(self, source: Executor, target: Executor, condition: Callable[[Any], bool] | None = None) -> Self def add_fan_out_edges(self, source: Executor, targets: list[Executor]) -> Self def add_fan_in_edges(self, sources: list[Executor], target: Executor, @@ -171,10 +179,10 @@ Internal component managing the Pregel-style execution. ```python class _Runner: """Manages superstep-based workflow execution""" - + async def run_until_convergence(self) -> AsyncIterable[WorkflowEvent]: """Run supersteps until no messages remain""" - + async def _run_iteration(self): """Execute one superstep of message delivery""" ``` @@ -187,7 +195,7 @@ Protocol defining the execution environment interface. @runtime_checkable class ExecutionContext(Protocol): """Execution context for message passing and event handling""" - + async def send_message(self, source_id: str, message: Any) -> None async def drain_messages(self) -> dict[str, list[Any]] async def add_event(self, event: WorkflowEvent) -> None @@ -196,32 +204,171 @@ class ExecutionContext(Protocol): ## Execution Model +### Data Flow Architecture + +The workflow framework implements a type-safe message passing system with the following key principles: + +#### Executor Type System +- **Single Input Type**: Each executor declares exactly one input type via generic parameter `Executor[T]` +- **Any Output Type**: Executors can produce any output type from their `_execute()` method +- **Type Validation**: Messages are routed only if `target.can_handle(data)` returns true + +#### Edge Routing Logic +Messages flow along edges based on a two-step validation: +1. **Type Compatibility**: Target executor must be able to handle the message type +2. **Condition Check**: Optional edge condition (if present) must evaluate to true + +If either check fails, the message is ignored for that edge. + ### Pregel-Style Supersteps -The framework uses a modified Pregel execution model: +The framework uses a modified Pregel execution model with clear data flow semantics: -1. **Initialization**: Start executor receives initial message -2. **Superstep Loop**: - - Collect all pending messages - - Deliver messages to target executors concurrently - - Execute all triggered executors - - Collect new messages and events -3. **Convergence**: Continue until no messages remain or max iterations reached +``` +Superstep N: +┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ +│ Collect All │───▶│ Route Messages │───▶│ Execute All │ +│ Pending │ │ Based on Type │ │ Target │ +│ Messages │ │ & Conditions │ │ Executors │ +└─────────────────┘ └─────────────────┘ └─────────────────┘ + │ +┌─────────────────┐ ┌─────────────────┐ │ +│ Start Next │◀───│ Emit Events & │◀────────────┘ +│ Superstep │ │ New Messages │ +└─────────────────┘ └─────────────────┘ +``` -### Message Delivery +### Message Delivery Patterns -Messages are delivered according to edge configurations: -- **Direct routing**: Message flows if type matches and condition passes -- **Fan-out**: Single source sends to multiple targets -- **Fan-in**: Multiple sources synchronize at single target -- **Conditional**: Routing based on message content +#### 1. Direct Routing (1:1) +``` +┌─────────────┐ message ┌─────────────┐ +│ Executor A │──────────────▶│ Executor B │ +│ Output: T │ │ Input: T │ +└─────────────┘ └─────────────┘ +``` + +**Sequence:** +```mermaid +sequenceDiagram + participant A as Executor A + participant Edge as Edge A→B + participant B as Executor B + + A->>Edge: send_message(data: T) + Edge->>B: can_handle(data: T)? + B-->>Edge: true + Edge->>B: execute(data: T, ctx) + B->>Edge: result +``` + +#### 2. Fan-out (1:N) +``` + ┌─────────────┐ + ┌───▶│ Executor B │ +┌────────────┤ │ Input: T │ +│ Executor A │ └─────────────┘ +│ Output: T │ +└────────────┤ ┌─────────────┐ + └───▶│ Executor C │ + │ Input: T │ + └─────────────┘ +``` + +**Sequence:** +```mermaid +sequenceDiagram + participant A as Executor A + participant B as Executor B + participant C as Executor C + + A->>A: _execute() produces message + par Send to B + A->>B: send_message(data) + and Send to C + A->>C: send_message(data) + end + par Execute concurrently + B->>B: _execute(data) + and + C->>C: _execute(data) + end +``` + +#### 3. Fan-in with Message Accumulation (N:1) +``` +┌─────────────┐ +│ Executor A │───┐ +│ Output: T₁ │ │ ┌─────────────────┐ +└─────────────┘ │ │ │ + ├──▶│ Executor D │ +┌─────────────┐ │ │ Input: [T₁,T₂] │ +│ Executor B │───┘ │ │ +│ Output: T₂ │ └─────────────────┘ +└─────────────┘ +``` + +**Key Behavior**: Messages accumulate in shared state until ALL configured edges in the fan-in group have contributed messages, then delivered as a collection. If any edge never contributes (due to type mismatch, condition failure, or no output), the fan-in won't execute. + +**Sequence:** +```mermaid +sequenceDiagram + participant A as Executor A + participant B as Executor B + participant SS as Shared State + participant D as Executor D + + A->>SS: store message from A + Note over SS: Wait for all configured edges + B->>SS: store message from B + Note over SS: All configured edges contributed + SS->>D: execute([msg_A, msg_B]) + D->>D: _execute([msg_A, msg_B]) +``` + +#### 4. Conditional Routing +``` + condition=λx: x.priority=="high" + ┌──────────────────────────────────┐ +┌────────┤ ▼ +│Router │ ┌─────────────────┐ +│ │ │ High Priority │ +│ │ │ Handler │ +└────────┤ └─────────────────┘ + │ condition=λx: x.priority=="low" + └──────────────────────────────────┐ + ▼ + ┌─────────────────┐ + │ Low Priority │ + │ Handler │ + └─────────────────┘ +``` + +**Sequence:** +```mermaid +sequenceDiagram + participant Router + participant HighHandler as High Priority Handler + participant LowHandler as Low Priority Handler + + Router->>Router: _execute() produces message + alt message.priority == "high" + Router->>HighHandler: send_message(high_priority_msg) + HighHandler->>HighHandler: _execute(msg) + else message.priority == "low" + Router->>LowHandler: send_message(low_priority_msg) + LowHandler->>LowHandler: _execute(msg) + else no condition matches + Note over Router: Message ignored + end +``` ### Concurrency Model -- Executors within a superstep run concurrently -- Messages from different sources are delivered in parallel -- Shared state access is protected by locks -- Event collection is thread-safe +- **Superstep Isolation**: All executors in a superstep run concurrently +- **Message Delivery**: Parallel delivery to all matching edges +- **Shared State**: Thread-safe access with atomic operations +- **Event Collection**: Lock-free event streaming ## API Design @@ -301,23 +448,27 @@ workflow = ( Executors take turns in a predefined sequence. ```python -class RoundRobinGroupChatManager(Executor[list[ChatMessageContent]]): - def __init__(self, members: list[str], max_rounds: int): - self._members = members - self._current_round = 0 - self._max_rounds = max_rounds - - async def _execute(self, data: list[ChatMessageContent], - ctx: ExecutorContext) -> AgentSelectionDecision | None: - if self._current_round >= self._max_rounds: - await ctx.add_event(WorkflowCompletedEvent(data=self._chat_history)) - return None - - selection = self._members[self._current_round % len(self._members)] - decision = AgentSelectionDecision(messages=data, selection=selection) - await ctx.send_message(decision) - self._current_round += 1 - return decision +# Building the group chat workflow +executor_a = AgentExecutor(id="agent_a") +executor_b = AgentExecutor(id="agent_b") +executor_c = AgentExecutor(id="agent_c") + +group_chat_manager = RoundRobinGroupChatManager( + members=["agent_a", "agent_b", "agent_c"], + max_rounds=3 +) + +workflow = ( + WorkflowBuilder() + .set_start_executor(group_chat_manager) + .add_loop(group_chat_manager, executor_a, + condition=lambda x: x.selection == "agent_a") + .add_loop(group_chat_manager, executor_b, + condition=lambda x: x.selection == "agent_b") + .add_loop(group_chat_manager, executor_c, + condition=lambda x: x.selection == "agent_c") + .build() +) ``` ### 3. Map-Reduce Pattern @@ -344,7 +495,7 @@ Dynamic routing based on message content. ```python workflow = ( WorkflowBuilder() - .add_edge(classifier, high_priority_handler, + .add_edge(classifier, high_priority_handler, lambda msg: msg.priority == "high") .add_edge(classifier, normal_handler, lambda msg: msg.priority == "normal") @@ -402,12 +553,12 @@ Thread-safe key-value store accessible to all executors. ```python class _SharedState: """Thread-safe shared state management""" - + async def set(self, key: str, value: Any) -> None async def get(self, key: str) -> Any async def has(self, key: str) -> bool async def delete(self, key: str) -> None - + @asynccontextmanager async def hold(self): """Hold lock for multiple operations""" @@ -420,10 +571,10 @@ class StatefulExecutor(Executor[str]): async def _execute(self, data: str, ctx: ExecutorContext) -> None: # Read from shared state counter = await ctx.get_shared_state("counter") or 0 - + # Update shared state await ctx.set_shared_state("counter", counter + 1) - + # Atomic multi-operation update async with ctx._shared_state.hold(): value1 = await ctx.get_shared_state("key1") @@ -436,6 +587,7 @@ class StatefulExecutor(Executor[str]): ### Design Approach The framework supports human intervention through: + 1. Special HIL executors that emit `HumanInTheLoopEvent` 2. Workflow suspension while awaiting human input 3. Targeted message delivery to resume execution @@ -447,8 +599,8 @@ class HumanInTheLoopExecutor(Executor[list[ChatMessageContent]]): def __init__(self): super().__init__() self._awaiting_input = False - - async def _execute(self, data: list[ChatMessageContent], + + async def _execute(self, data: list[ChatMessageContent], ctx: ExecutorContext) -> list[ChatMessageContent] | None: if not self._awaiting_input: # Request human input @@ -474,7 +626,7 @@ while True: else: # Normal execution events = workflow.run_stream(initial_message) - + async for event in events: if isinstance(event, HumanInTheLoopEvent): hil_event = event @@ -535,7 +687,7 @@ class StreamingAgentExecutor(Executor[str]): async def _execute(self, prompt: str, ctx: ExecutorContext): async for chunk in self.agent.stream(prompt): await ctx.add_event(AgentRunStreamingEvent(self.id, chunk)) - + final_response = await self.agent.get_final() await ctx.send_message(final_response) ``` @@ -543,21 +695,25 @@ class StreamingAgentExecutor(Executor[str]): ## Security Considerations ### 1. Type Safety + - Strong typing prevents type confusion attacks - Runtime type validation catches mismatched messages - Generic type parameters enforce compile-time safety ### 2. State Isolation + - Executors cannot directly access each other's state - Shared state requires explicit key-based access - No global mutable state outside controlled interfaces ### 3. Message Validation + - All messages are validated against executor input types - Conditional routing provides additional filtering - Malformed messages are rejected at edge boundaries ### 4. Resource Limits + - Maximum iteration count prevents infinite loops - Timeout support for long-running executors (planned) - Memory usage bounded by message queue size @@ -565,21 +721,25 @@ class StreamingAgentExecutor(Executor[str]): ## Performance Considerations ### 1. Concurrency + - Superstep model enables parallel executor execution - Message delivery within superstep is concurrent - Async/await throughout for non-blocking I/O ### 2. Memory Efficiency + - Messages are passed by reference where possible - Event streaming prevents memory accumulation - Lazy evaluation of conditional edges ### 3. Scalability + - O(E + V) complexity per superstep (edges + vertices) - Linear scaling with number of messages - Shared state operations are O(1) average case ### 4. Optimization Opportunities + - Edge pre-computation for static workflows - Message batching for high-throughput scenarios - Executor pooling for stateless processors @@ -587,6 +747,7 @@ class StreamingAgentExecutor(Executor[str]): ## Future Enhancements ### 1. Checkpointing and Recovery + ```python class CheckpointProvider(Protocol): async def save_checkpoint(self, workflow_id: str, state: WorkflowState) -> str @@ -595,23 +756,27 @@ class CheckpointProvider(Protocol): ``` ### 2. Distributed Execution + - Support for executor distribution across nodes - Message passing via message queues - Distributed shared state with consistency guarantees ### 3. Advanced Patterns + - Sub-workflow composition - Dynamic executor instantiation - Recursive workflow structures - Time-based triggers and delays ### 4. Observability Enhancements + - OpenTelemetry integration - Structured logging with correlation IDs - Performance metrics and profiling - Visual workflow debugging tools ### 5. Template System + ```python # Planned template system for reusable patterns template = WorkflowTemplate("map_reduce") @@ -624,6 +789,7 @@ workflow = template.instantiate(mapper_count=5, reducer_count=3) ``` ### 6. Error Handling and Retry + - Configurable retry policies per executor - Dead letter queues for failed messages - Circuit breaker pattern support @@ -633,4 +799,4 @@ workflow = template.instantiate(mapper_count=5, reducer_count=3) The Semantic Kernel Workflow Framework provides a robust foundation for building complex AI-powered workflows. Its type-safe, event-driven architecture combined with flexible execution patterns makes it suitable for a wide range of applications from simple sequential processing to complex multi-agent orchestrations with human oversight. -The framework's design prioritizes developer experience through its fluent API while maintaining the flexibility needed for advanced use cases. As the framework evolves, planned enhancements around distributed execution, checkpointing, and advanced patterns will further expand its capabilities while maintaining the core principles of type safety, observability, and extensibility. \ No newline at end of file +The framework's design prioritizes developer experience through its fluent API while maintaining the flexibility needed for advanced use cases. As the framework evolves, planned enhancements around distributed execution, checkpointing, and advanced patterns will further expand its capabilities while maintaining the core principles of type safety, observability, and extensibility. From db9db69c5c3652a6047c008b35d62725327bcad6 Mon Sep 17 00:00:00 2001 From: Ben Thomas Date: Wed, 30 Jul 2025 14:29:12 -0700 Subject: [PATCH 03/11] Added sequence diagram for fan_in with WhenAny --- docs/design/workflows_updated.md | 36 +++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/docs/design/workflows_updated.md b/docs/design/workflows_updated.md index a5ac02ba9c..efe36d4a0c 100644 --- a/docs/design/workflows_updated.md +++ b/docs/design/workflows_updated.md @@ -295,7 +295,9 @@ sequenceDiagram end ``` -#### 3. Fan-in with Message Accumulation (N:1) +#### 3. Fan-in Patterns (N:1) + +##### 3a. Fan-in with `Activation.WhenAll` (Message Accumulation) ``` ┌─────────────┐ │ Executor A │───┐ @@ -326,6 +328,38 @@ sequenceDiagram D->>D: _execute([msg_A, msg_B]) ``` +##### 3b. Fan-in with `Activation.WhenAny` (Individual Message Processing) +``` +┌─────────────┐ +│ Executor A │───┐ +│ Output: T │ │ ┌─────────────────┐ +└─────────────┘ │ │ │ + ├──▶│ Executor D │ +┌─────────────┐ │ │ Input: T │ +│ Executor B │───┘ │ │ +│ Output: T │ └─────────────────┘ +└─────────────┘ +``` + +**Key Behavior**: Each message triggers the executor independently. No accumulation occurs. If multiple messages arrive in the same superstep, the executor runs multiple times. + +**Sequence:** +```mermaid +sequenceDiagram + participant A as Executor A + participant B as Executor B + participant D as Executor D + + par Independent execution + A->>D: execute(msg_A) + D->>D: _execute(msg_A) + and + B->>D: execute(msg_B) + D->>D: _execute(msg_B) + end + Note over D: Executor runs twice +``` + #### 4. Conditional Routing ``` condition=λx: x.priority=="high" From 6358818b3e3b0b584853441fbf7cdba115db614c Mon Sep 17 00:00:00 2001 From: Ben Thomas Date: Fri, 1 Aug 2025 07:11:08 -0700 Subject: [PATCH 04/11] Updating workflows design --- docs/design/workflows_updated.md | 823 +++++++++++++++++++++---------- 1 file changed, 557 insertions(+), 266 deletions(-) diff --git a/docs/design/workflows_updated.md b/docs/design/workflows_updated.md index efe36d4a0c..1d3c216cdb 100644 --- a/docs/design/workflows_updated.md +++ b/docs/design/workflows_updated.md @@ -11,7 +11,7 @@ 7. [Pattern Implementation](#pattern-implementation) 8. [Event System](#event-system) 9. [State Management](#state-management) -10. [Human-in-the-Loop Support](#human-in-the-loop-support) +10. [Request/Response Support](#request-response-support) 11. [Advanced Features](#advanced-features) 12. [Security Considerations](#security-considerations) 13. [Performance Considerations](#performance-considerations) @@ -19,17 +19,16 @@ ## Executive Summary -The Semantic Kernel Workflow Framework is a sophisticated orchestration system designed to manage complex multi-agent workflows with support for various execution patterns including sequential, concurrent, conditional, and human-in-the-loop scenarios. Built on a graph-based architecture using Pregel-style execution, the framework provides a flexible and extensible foundation for building AI-powered applications. +The Agent Framework Workflow system is a sophisticated orchestration framework designed to manage complex multi-agent workflows with advanced type safety and polymorphic execution patterns. Built on a graph-based architecture using Pregel-style execution, the framework provides a flexible and extensible foundation for building AI-powered applications with natural multi-handler patterns. Key features include: -- Type-safe executor-based architecture -- Asynchronous event-driven execution -- Built-in support for common patterns (sequential, fan-out/fan-in, loops) -- Human-in-the-loop capabilities -- Shared state management with thread-safe operations -- Comprehensive event streaming for observability -- Checkpointing and resumption capabilities (planned) +- **Multi-handler executors** with `@handles_message` decorator pattern +- **Built-in request/response support** with automatic correlation and external integration +- **Comprehensive type validation** preventing handler conflicts +- **Polymorphic message routing** based on runtime type checking +- **Thread-safe shared state management** with correlation tracking +- **Asynchronous event-driven execution** with comprehensive observability ## Introduction @@ -82,125 +81,256 @@ The framework follows a graph-based architecture where: ## Core Components -### 1. Executor (`executor.py`) - -The fundamental processing unit in the workflow system. +The workflow framework consists of six core components that work together to create a flexible, type-safe execution environment: + +```txt +┌───────────────────────────────────────────────────────────────────┐ +│ Workflow System │ +├─────────────────┬───────────────┬─────────────────────────────────┤ +│ │ │ │ +│ Executors │ Edges │ Workflow │ +│ (Processing) │ (Routing) │ (Orchestration) │ +│ │ │ │ +│ ┌─────────────┐ │ ┌───────────┐ │ ┌─────────────────────────────┐ │ +│ │@handles_msg │ │ │Conditional│ │ │ • Manages execution flow │ │ +│ │┌───────────┐│ │ │ Routing │ │ │ • Coordinates executors │ │ +│ ││Handler A ││ │ └─────┬─────┘ │ │ • Streams events │ │ +│ │├───────────┤│ │ │ │ └─────────────┬───────────────┘ │ +│ ││Handler B ││◄├───────┴───────┤► │ │ +│ │├───────────┤│ │ │ ▼ │ +│ ││Handler C ││ │ Type-based │ WorkflowContext │ +│ │└───────────┘│ │ Routing │ (Shared State & Events) │ +│ └─────────────┘ │ │ │ +└─────────────────┴───────────────┴─────────────────────────────────┘ +``` -```python -class Executor(Generic[T], ABC): - """Base class for all workflow executors""" +### Component Overview - def __init__(self, id: str | None = None): - self._id = id or str(uuid.uuid4()) - self._input_type = self._extract_type_parameter() +1. **Executors**: The processing units that handle messages +2. **Edges**: Define message flow and routing rules between executors +3. **Workflow**: Orchestrates execution and manages the lifecycle +4. **WorkflowContext**: Provides shared state and event management +5. **Message Handlers**: Enable polymorphic message processing +6. **RequestInfoExecutor**: Built-in support for external integrations - @abstractmethod - async def _execute(self, data: T, ctx: ExecutorContext) -> Any: - """Execute logic to be implemented by subclasses""" +### 0. Message Handler Pattern - async def execute(self, data: T, ctx: ExecutorContext) -> Any: - """Wrapper that emits events and calls _execute""" +The `@handles_message` decorator transforms executors into polymorphic processors: - def can_handle(self, data: Any) -> bool: - """Type checking for incoming messages""" ``` +┌─────────────────────────┐ ┌─────────────────────────┐ +│ Traditional Way │ │ Multi-Handler Way │ +├─────────────────────────┤ ├─────────────────────────┤ +│ │ │ @handles_message │ +│ class MyExecutor: │ │ async def handle_typeA()│ +│ def can_handle(): │ ───► │ │ +│ # Complex logic │ │ @handles_message │ +│ def _execute(): │ │ async def handle_typeB()│ +│ # Big switch/if │ │ │ +│ │ │ # Automatic routing! │ +└─────────────────────────┘ └─────────────────────────┘ +``` + +**How it works:** +- Decorates methods to mark them as message handlers +- Automatically extracts the expected message type from method signature +- Framework discovers all handlers during executor initialization +- Routes messages to appropriate handler based on runtime type -**Key Features:** +### 1. Executor - The Processing Unit -- Generic type parameter for input type safety -- Automatic type extraction and validation -- Built-in event emission for observability -- Unique identifier for routing and debugging +Executors are the fundamental building blocks that process messages in a workflow: -### 2. Edge (`_edge.py`) +``` +┌────────────────────────────────────────┐ +│ Executor │ +├────────────────────────────────────────┤ +│ ID: "data_processor" │ +├────────────────────────────────────────┤ +│ Message Handlers: │ +│ • handle_text(TextData) → ProcessedText│ +│ • handle_image(ImageData) → Thumbnail │ +│ • handle_batch(List[Any]) → Report │ +├────────────────────────────────────────┤ +│ Lifecycle: │ +│ 1. Receive message │ +│ 2. Match type to handler │ +│ 3. Execute handler │ +│ 4. Send output messages │ +│ 5. Emit events │ +└────────────────────────────────────────┘ +``` -Represents directed connections between executors with optional routing conditions. +**Key Concepts:** -```python -class _Edge: - """Directed edge with conditional routing support""" +- **Identity**: Each executor has a unique ID for routing and debugging +- **Polymorphic**: Can handle multiple message types via different handlers +- **Type-Safe**: Validates message types before processing +- **Event Emitting**: Broadcasts lifecycle events for observability +- **Stateless**: Designed to be stateless (state managed via WorkflowContext) - def __init__(self, source: Executor, target: Executor, - condition: Callable[[Any], bool] | None = None): - self.source = source - self.target = target - self._condition = condition - self._edge_group_ids: list[str] = [] -``` +### 2. Edge - The Message Highway -**Key Features:** +Edges define how messages flow between executors: -- Conditional routing based on message content -- Edge groups for fan-in synchronization -- Type-aware message filtering -- Support for complex routing patterns +``` +┌─────────────┐ ┌─────────────┐ +│ Executor A │ │ Executor B │ +│ │ Edge Rules: │ │ +│ Output: │ 1. Type Check │ Input: │ +│ UserData │ 2. Condition? │ UserData │ +│ │ 3. Route Message │ │ +└──────┬──────┘ └──────▲──────┘ + │ │ + │ ┌──────────────┐ │ + └────────►│ Edge │─────────┘ + │ │ + │ if user.age │ + │ >= 18 │ + └──────────────┘ +``` -### 3. Workflow (`workflow.py`) +**Edge Capabilities:** -The main orchestration container that manages execution. +- **Type Filtering**: Only routes messages the target can handle +- **Conditional Logic**: Optional conditions for dynamic routing +- **Fan-out Support**: One source can connect to multiple targets +- **Fan-in Support**: Multiple sources can connect to one target +- **Edge Groups**: Coordinate message collection for fan-in patterns -```python -class Workflow: - """Workflow container managing executors and execution""" +### 3. Workflow - The Orchestrator - def __init__(self, edges: list[_Edge], start_executor: Executor | str, - execution_context: ExecutionContext): - self._edges = edges - self._start_executor = start_executor - self._runner = _Runner(edges, shared_state, execution_context) +The Workflow ties everything together and manages execution: - async def run_stream(self, message: Any, - executor: Executor | str | None = None) -> AsyncIterable[WorkflowEvent]: - """Stream execution events as the workflow runs""" +``` +┌─────────────────────────────────────────────────────┐ +│ Workflow │ +├─────────────────────────────────────────────────────┤ +│ Components: │ +│ • Executors: [A, B, C, RequestInfo*] │ +│ • Edges: [A→B, A→C, B→C] │ +│ • Start: A │ +│ • Runner: Pregel-style superstep execution │ +├─────────────────────────────────────────────────────┤ +│ Execution Flow: │ +│ 1. run_stream(message) ──► Start at executor A │ +│ 2. Superstep 1: A processes, sends to B & C │ +│ 3. Superstep 2: B & C process in parallel │ +│ 4. Stream events throughout execution │ +│ 5. Complete when no messages remain │ +└─────────────────────────────────────────────────────┘ + * RequestInfo executor added automatically ``` -### 4. WorkflowBuilder (`workflow.py`) - -Fluent API for constructing workflows. +**Workflow Responsibilities:** -```python -class WorkflowBuilder: - """Builder pattern for workflow construction""" +- **Graph Management**: Maintains the executor graph structure +- **Execution Control**: Initiates and monitors workflow runs +- **Event Streaming**: Provides real-time execution visibility +- **Request/Response**: Built-in support for external integrations +- **Automatic Enhancement**: Adds RequestInfoExecutor if not present - def add_edge(self, source: Executor, target: Executor, - condition: Callable[[Any], bool] | None = None) -> Self - def add_fan_out_edges(self, source: Executor, targets: list[Executor]) -> Self - def add_fan_in_edges(self, sources: list[Executor], target: Executor, - activation: Activation = Activation.WhenAll) -> Self - def add_loop(self, source: Executor, target: Executor, - condition: Callable[[Any], bool] | None = None) -> Self - def add_chain(self, executors: list[Executor]) -> Self -``` +### 4. WorkflowBuilder - The Construction API -### 5. Runner (`_runner.py`) +Provides a fluent interface for building workflows: -Internal component managing the Pregel-style execution. +``` +┌─────────────────────────────────────────────────────┐ +│ WorkflowBuilder Patterns │ +├─────────────────────────────────────────────────────┤ +│ │ +│ Sequential: A ──► B ──► C │ +│ .add_chain([A, B, C]) │ +│ │ +│ Fan-out: ┌──► B │ +│ A ─┼──► C │ +│ └──► D │ +│ .add_fan_out_edges(A, [B, C, D]) │ +│ │ +│ Conditional: ┌─[if x>0]─► B │ +│ A ─┤ │ +│ └─[if x<0]─► C │ +│ .add_edge(A, B, lambda x: x > 0) │ +│ .add_edge(A, C, lambda x: x < 0) │ +│ │ +│ Fan-in: A ─┐ │ +│ B ─┼──► D │ +│ C ─┘ │ +│ .add_fan_in_edges([A, B, C], D) │ +└─────────────────────────────────────────────────────┘ +``` -```python -class _Runner: - """Manages superstep-based workflow execution""" +### 5. WorkflowContext - The Shared Environment - async def run_until_convergence(self) -> AsyncIterable[WorkflowEvent]: - """Run supersteps until no messages remain""" +Provides executors with access to shared state and event emission: - async def _run_iteration(self): - """Execute one superstep of message delivery""" +``` +┌─────────────────────────────────────────────────────┐ +│ WorkflowContext │ +├─────────────────────────────────────────────────────┤ +│ │ +│ Shared State: Event Stream: │ +│ ┌─────────────┐ ┌──────────────────┐ │ +│ │ counter: 42 │ │ ExecutorInvoke │ │ +│ │ user: {...} │ │ ExecutorComplete │ │ +│ │ cache: [...]│ │ RequestInfo │ │ +│ └─────────────┘ │ WorkflowComplete │ │ +│ └──────────────────┘ │ +│ │ +│ Methods: │ +│ • send_message(msg) - Route to next executors │ +│ • add_event(event) - Emit to event stream │ +│ • get/set_shared_state(key, value) - Share data │ +└─────────────────────────────────────────────────────┘ ``` -### 6. ExecutionContext (`execution_context.py`) +**Context Capabilities:** -Protocol defining the execution environment interface. +- **Message Routing**: Send messages that flow along edges +- **State Management**: Thread-safe shared state between executors +- **Event Broadcasting**: Emit events for external monitoring +- **Request Correlation**: Track request/response pairs -```python -@runtime_checkable -class ExecutionContext(Protocol): - """Execution context for message passing and event handling""" +### 6. RequestInfoExecutor - The External Gateway + +A special built-in executor for handling external interactions: - async def send_message(self, source_id: str, message: Any) -> None - async def drain_messages(self) -> dict[str, list[Any]] - async def add_event(self, event: WorkflowEvent) -> None - async def drain_events(self) -> list[WorkflowEvent] ``` +┌─────────────────────────────────────────────────────┐ +│ RequestInfoExecutor │ +│ (ID: "request_info") │ +├─────────────────────────────────────────────────────┤ +│ │ +│ Workflow External World │ +│ │ ▲ │ +│ │ UserApprovalRequest │ │ +│ ▼ │ │ +│ ┌──────────────┐ RequestInfoEvent │ │ +│ │ RequestInfo │ ─────────────────────►│ │ +│ │ Executor │ (request_id: 123)│ │ +│ │ │ │ │ +│ │ │◄───────────────────────┘ │ +│ └──────────────┘ send_response(true, 123) │ +│ │ │ +│ │ true │ +│ ▼ │ +│ Continue... │ +└─────────────────────────────────────────────────────┘ +``` + +**How it works:** + +1. **Intercepts Requests**: Catches any `RequestMessage` subclass +2. **Generates Correlation ID**: Creates unique request_id +3. **Emits Event**: Sends RequestInfoEvent for external handling +4. **Waits for Response**: External system calls `workflow.send_response()` +5. **Continues Flow**: Response routed back through workflow edges + +**Use Cases:** +- Human approval workflows +- External API calls +- Database lookups +- Any async external integration ## Execution Model @@ -209,9 +339,10 @@ class ExecutionContext(Protocol): The workflow framework implements a type-safe message passing system with the following key principles: #### Executor Type System -- **Single Input Type**: Each executor declares exactly one input type via generic parameter `Executor[T]` -- **Any Output Type**: Executors can produce any output type from their `_execute()` method -- **Type Validation**: Messages are routed only if `target.can_handle(data)` returns true +- **Multiple Input Types**: Executors can handle multiple types via `@handles_message` decorated methods +- **Handler Discovery**: Types automatically detected from method signatures +- **Type Validation**: Messages routed to appropriate handlers, raises error if no handler found +- **Conflict Prevention**: Validation prevents multiple handlers for same type #### Edge Routing Logic Messages flow along edges based on a two-step validation: @@ -227,12 +358,12 @@ The framework uses a modified Pregel execution model with clear data flow semant ``` Superstep N: ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ -│ Collect All │───▶│ Route Messages │───▶│ Execute All │ +│ Collect All │───▶│ Route Messages │───▶│ Execute All │ │ Pending │ │ Based on Type │ │ Target │ │ Messages │ │ & Conditions │ │ Executors │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ -┌─────────────────┐ ┌─────────────────┐ │ +┌─────────────────┐ ┌─────────────────┐ │ │ Start Next │◀───│ Emit Events & │◀────────────┘ │ Superstep │ │ New Messages │ └─────────────────┘ └─────────────────┘ @@ -282,22 +413,22 @@ sequenceDiagram participant B as Executor B participant C as Executor C - A->>A: _execute() produces message + A->>A: handler method produces message par Send to B A->>B: send_message(data) and Send to C A->>C: send_message(data) end par Execute concurrently - B->>B: _execute(data) + B->>B: handle message via @handles_message and - C->>C: _execute(data) + C->>C: handle message via @handles_message end ``` #### 3. Fan-in Patterns (N:1) -##### 3a. Fan-in with `Activation.WhenAll` (Message Accumulation) +##### 3a. Fan-in (Message Collection) ``` ┌─────────────┐ │ Executor A │───┐ @@ -310,7 +441,7 @@ sequenceDiagram └─────────────┘ ``` -**Key Behavior**: Messages accumulate in shared state until ALL configured edges in the fan-in group have contributed messages, then delivered as a collection. If any edge never contributes (due to type mismatch, condition failure, or no output), the fan-in won't execute. +**Key Behavior**: When multiple executors connect to a single target executor that expects a list type, messages are collected and delivered as a list to the target executor's handler method. **Sequence:** ```mermaid @@ -325,10 +456,10 @@ sequenceDiagram B->>SS: store message from B Note over SS: All configured edges contributed SS->>D: execute([msg_A, msg_B]) - D->>D: _execute([msg_A, msg_B]) + D->>D: handle list via @handles_message ``` -##### 3b. Fan-in with `Activation.WhenAny` (Individual Message Processing) +##### 3b. Fan-in (Individual Message Processing) ``` ┌─────────────┐ │ Executor A │───┐ @@ -352,10 +483,10 @@ sequenceDiagram par Independent execution A->>D: execute(msg_A) - D->>D: _execute(msg_A) + D->>D: handle msg_A via @handles_message and B->>D: execute(msg_B) - D->>D: _execute(msg_B) + D->>D: handle msg_B via @handles_message end Note over D: Executor runs twice ``` @@ -385,13 +516,13 @@ sequenceDiagram participant HighHandler as High Priority Handler participant LowHandler as Low Priority Handler - Router->>Router: _execute() produces message + Router->>Router: handler method produces message alt message.priority == "high" Router->>HighHandler: send_message(high_priority_msg) - HighHandler->>HighHandler: _execute(msg) + HighHandler->>HighHandler: handle message via @handles_message else message.priority == "low" Router->>LowHandler: send_message(low_priority_msg) - LowHandler->>LowHandler: _execute(msg) + LowHandler->>LowHandler: handle message via @handles_message else no condition matches Note over Router: Message ignored end @@ -406,15 +537,35 @@ sequenceDiagram ## API Design -### Creating Executors +### Creating Single-Handler Executors ```python @output_message_types(str) -class UpperCaseExecutor(Executor[str]): - async def _execute(self, data: str, ctx: ExecutorContext) -> str: +class UpperCaseExecutor(Executor): + @handles_message + async def process_text(self, data: str, ctx: WorkflowContext) -> None: result = data.upper() await ctx.send_message(result) - return result +``` + +### Creating Multi-Handler Executors + +```python +@output_message_types(TemperatureResponse, WeatherResponse) +class WeatherService(Executor): + """Service handling multiple weather query types""" + + @handles_message + async def get_temperature(self, query: TemperatureQuery, ctx: WorkflowContext) -> None: + response = TemperatureResponse(location=query.location, temperature=20.0) + await ctx.send_message(response) + + @handles_message + async def get_weather(self, query: WeatherQuery, ctx: WorkflowContext) -> None: + response = WeatherResponse( + location=query.location, temperature=20.0, conditions="Sunny" + ) + await ctx.send_message(response) ``` ### Building Workflows @@ -442,8 +593,7 @@ workflow = ( WorkflowBuilder() .set_start_executor(splitter) .add_fan_out_edges(splitter, [worker1, worker2, worker3]) - .add_fan_in_edges([worker1, worker2, worker3], aggregator, - activation=Activation.WhenAll) + .add_fan_in_edges([worker1, worker2, worker3], aggregator) .build() ) ``` @@ -455,87 +605,136 @@ workflow = ( async for event in workflow.run_stream(initial_message): if isinstance(event, ExecutorCompleteEvent): print(f"Executor {event.executor_id} completed") - elif isinstance(event, HumanInTheLoopEvent): - # Handle human intervention request - user_input = await get_user_input() - async for event in workflow.run_stream(user_input, executor=event.executor_id): - # Process continuation events + elif isinstance(event, RequestInfoEvent): + # Handle request/response pattern + user_response = await get_user_input(event.request_data) + async for event in workflow.send_response(user_response, event.request_id): + # Process response continuation events ``` ## Pattern Implementation -### 1. Sequential Processing +### 1. Multi-Handler Pattern + +Single executor handling multiple message types. + +```python +@output_message_types(ProcessedDataA, ProcessedDataB) +class PolymorphicProcessor(Executor): + @handles_message + async def handle_type_a(self, data: InputTypeA, ctx: WorkflowContext): + result = ProcessedDataA(data.value * 2) + await ctx.send_message(result) + + @handles_message + async def handle_type_b(self, data: InputTypeB, ctx: WorkflowContext): + result = ProcessedDataB(data.text.upper()) + await ctx.send_message(result) +``` + +### 2. Request/Response Pattern + +Built-in support for external request handling. + +```python +@dataclass +class UserQuery(RequestMessage): + question: str + +# Workflow automatically includes RequestInfoExecutor +workflow = WorkflowBuilder().set_start_executor(processor).build() + +# Handle request/response externally +async for event in workflow.run_stream(UserQuery("What is AI?")): + if isinstance(event, RequestInfoEvent): + response = await get_human_response(event.request_data.question) + async for event in workflow.send_response(response, event.request_id): + # Process response... +``` + +### 3. Sequential Processing Simple chain of executors processing data in order. ```python workflow = ( WorkflowBuilder() - .add_chain([preprocessor, analyzer, formatter]) + .add_chain([preprocessor, analyzer, formatter]) .set_start_executor(preprocessor) .build() ) ``` -### 2. Round-Robin Group Chat +### 4. Round-Robin Group Chat -Executors take turns in a predefined sequence. +Multi-agent coordination with conditional routing. ```python -# Building the group chat workflow -executor_a = AgentExecutor(id="agent_a") -executor_b = AgentExecutor(id="agent_b") -executor_c = AgentExecutor(id="agent_c") - -group_chat_manager = RoundRobinGroupChatManager( - members=["agent_a", "agent_b", "agent_c"], - max_rounds=3 -) - +@output_message_types(TurnSelection) +class GroupChatManager(Executor): + @handles_message + async def coordinate_chat(self, messages: list[ChatMessage], ctx: WorkflowContext): + next_agent = self._agents[self._turn_count % len(self._agents)] + selection = TurnSelection(messages=messages, selected_agent=next_agent) + await ctx.send_message(selection) + +@output_message_types(list[ChatMessage]) +class ChatAgent(Executor): + @handles_message + async def respond_when_selected(self, selection: TurnSelection, ctx: WorkflowContext): + if selection.selected_agent == self._name: + response = ChatMessage(text=f"This is {self._name} speaking!") + await ctx.send_message([response]) + +# Conditional routing based on agent selection workflow = ( WorkflowBuilder() - .set_start_executor(group_chat_manager) - .add_loop(group_chat_manager, executor_a, - condition=lambda x: x.selection == "agent_a") - .add_loop(group_chat_manager, executor_b, - condition=lambda x: x.selection == "agent_b") - .add_loop(group_chat_manager, executor_c, - condition=lambda x: x.selection == "agent_c") + .set_start_executor(manager) + .add_edge(manager, alice, + condition=lambda sel: sel.selected_agent == "alice") + .add_edge(manager, bob, + condition=lambda sel: sel.selected_agent == "bob") + .add_edge(alice, manager) + .add_edge(bob, manager) .build() ) ``` -### 3. Map-Reduce Pattern +### 5. Map-Reduce Pattern -Parallel processing with aggregation. +Parallel processing with aggregation using fan-out/fan-in. ```python -# Split -> Map (parallel) -> Shuffle -> Reduce (parallel) -> Aggregate +# Split -> Map (parallel) -> Reduce -> Aggregate workflow = ( WorkflowBuilder() .set_start_executor(splitter) - .add_fan_out_edges(splitter, mappers) - .add_fan_in_edges(mappers, shuffler, activation=Activation.WhenAll) - .add_fan_out_edges(shuffler, reducers) - .add_fan_in_edges(reducers, aggregator, activation=Activation.WhenAll) + .add_fan_out_edges(splitter, mappers) # Parallel processing + .add_fan_in_edges(mappers, aggregator) .build() ) ``` -### 4. Conditional Branching +### 6. Conditional Branching Dynamic routing based on message content. ```python +@output_message_types(EmailMessage) +class SpamDetector(Executor): + @handles_message + async def analyze_email(self, content: str, ctx: WorkflowContext): + is_spam = any(keyword in content.lower() for keyword in self._spam_keywords) + await ctx.send_message(EmailMessage(content=content, is_spam=is_spam)) + +# Conditional routing based on analysis results workflow = ( WorkflowBuilder() - .add_edge(classifier, high_priority_handler, - lambda msg: msg.priority == "high") - .add_edge(classifier, normal_handler, - lambda msg: msg.priority == "normal") - .add_edge(classifier, low_priority_handler, - lambda msg: msg.priority == "low") - .set_start_executor(classifier) + .set_start_executor(detector) + .add_edge(detector, responder, + condition=lambda email: not email.is_spam) + .add_edge(detector, spam_filter, + condition=lambda email: email.is_spam) .build() ) ``` @@ -557,8 +756,20 @@ ExecutorCompleteEvent # Executor finishes processing AgentRunEvent # Agent produces final response AgentRunStreamingEvent # Agent streams partial response -# Control flow events -HumanInTheLoopEvent # Human intervention required +# Request/Response events +RequestInfoEvent # Request received with correlation ID +``` + +### RequestInfoEvent Structure + +```python +@dataclass +class RequestInfoEvent(WorkflowEvent): + """Event emitted when RequestInfoExecutor processes a request""" + request_id: str # Unique correlation ID + source_executor_id: str # ID of the executor that sent the request + request_type: str # Type name of request message + request_data: RequestMessage # The actual request data ``` ### Event Handling @@ -568,12 +779,14 @@ Events are emitted during execution and streamed to consumers: ```python async for event in workflow.run_stream(message): match event: - case ExecutorCompleteEvent(executor_id=id, data=result): - logger.info(f"Executor {id} completed with result: {result}") - case HumanInTheLoopEvent(executor_id=id): - # Pause workflow for human input - human_response = await prompt_user() - # Resume with human input + case ExecutorCompleteEvent(executor_id=id): + logger.info(f"Executor {id} completed") + case RequestInfoEvent(request_id=rid, request_data=data): + # Handle external request + response = await get_external_response(data) + # Resume with response + async for event in workflow.send_response(response, rid): + # Process continuation case WorkflowCompletedEvent(data=final_result): return final_result ``` @@ -601,8 +814,9 @@ class _SharedState: ### Usage in Executors ```python -class StatefulExecutor(Executor[str]): - async def _execute(self, data: str, ctx: ExecutorContext) -> None: +class StatefulExecutor(Executor): + @handles_message + async def process_data(self, data: str, ctx: WorkflowContext) -> None: # Read from shared state counter = await ctx.get_shared_state("counter") or 0 @@ -616,58 +830,60 @@ class StatefulExecutor(Executor[str]): await ctx.set_shared_state("combined", value1 + value2) ``` -## Human-in-the-Loop Support +## Request/Response Support ### Design Approach -The framework supports human intervention through: +The framework provides built-in request/response patterns through: -1. Special HIL executors that emit `HumanInTheLoopEvent` -2. Workflow suspension while awaiting human input -3. Targeted message delivery to resume execution +1. **RequestMessage Base Class**: All requests inherit from `RequestMessage` +2. **RequestInfoExecutor**: Automatically added to workflows +3. **Request Correlation**: Unique IDs for tracking request/response pairs +4. **External Integration**: Clean API for external response handling -### Implementation Example +### RequestMessage Pattern ```python -class HumanInTheLoopExecutor(Executor[list[ChatMessageContent]]): - def __init__(self): - super().__init__() - self._awaiting_input = False - - async def _execute(self, data: list[ChatMessageContent], - ctx: ExecutorContext) -> list[ChatMessageContent] | None: - if not self._awaiting_input: - # Request human input - self._awaiting_input = True - await ctx.add_event(HumanInTheLoopEvent(executor_id=self.id)) - return None +@dataclass +class UserApprovalRequest(RequestMessage): + """Request requiring human approval""" + action: str + details: str + risk_level: str + +@output_message_types() +class ApprovalProcessor(Executor): + @handles_message + async def process_approval(self, response: bool, ctx: WorkflowContext): + if response: + await ctx.add_event(WorkflowCompletedEvent("Action approved")) else: - # Process human response - self._awaiting_input = False - await ctx.send_message(data) - return data + await ctx.add_event(WorkflowCompletedEvent("Action rejected")) ``` ### Integration Pattern ```python -# Main execution loop with HIL support -hil_event = None -while True: - if hil_event: - # Resume at specific executor with human input - events = workflow.run_stream(human_input, executor=hil_event.executor_id) - else: - # Normal execution - events = workflow.run_stream(initial_message) - - async for event in events: - if isinstance(event, HumanInTheLoopEvent): - hil_event = event - human_input = await get_human_input() - break - elif isinstance(event, WorkflowCompletedEvent): - return event.data +# Workflow automatically includes RequestInfoExecutor +workflow = WorkflowBuilder().set_start_executor(processor).build() + +# Request/response handling +async for event in workflow.run_stream(UserApprovalRequest( + action="delete_files", details="Remove temp files", risk_level="low" +)): + if isinstance(event, RequestInfoEvent): + # Handle request externally + user_decision = await prompt_user( + f"Approve {event.request_data.action}? (y/n)" + ) + approval = user_decision.lower() == 'y' + + # Send response back to workflow + async for event in workflow.send_response(approval, event.request_id): + if isinstance(event, WorkflowCompletedEvent): + print(f"Result: {event.data}") + break + break ``` ## Advanced Features @@ -680,50 +896,81 @@ Fan-in edges can be grouped for synchronized delivery: # All three workers must complete before aggregator runs workflow = ( WorkflowBuilder() - .add_fan_in_edges([worker1, worker2, worker3], aggregator, - activation=Activation.WhenAll) + .add_fan_in_edges([worker1, worker2, worker3], aggregator) .build() ) ``` -### 2. Type System Integration +### 2. Handler Conflict Validation -The framework leverages Python's type system for safety: +Prevents runtime errors through comprehensive validation: ```python -def _is_instance_of(data: Any, target_type: type) -> bool: - """Runtime type checking supporting generics""" - # Handles Union, Optional, List, Dict, Tuple types - # Provides comprehensive type validation +def _validate_handler_conflicts(self, handlers: list[tuple[str, Any, type]]) -> None: + """Validate no conflicting message handlers exist""" + type_to_handler = {} + for method_name, method, message_type in handlers: + expanded_types = self._expand_type(message_type) + for expanded_type in expanded_types: + if expanded_type in type_to_handler: + existing = type_to_handler[expanded_type] + raise ValueError( + f"Conflicting message handlers: {existing} and {method_name} " + f"both handle {expanded_type}" + ) + type_to_handler[expanded_type] = method_name ``` -### 3. Custom Output Type Declaration +**Conflict Detection:** +- **Direct conflicts**: Two handlers for same type +- **Union conflicts**: `Union[A, B]` vs separate `A` handler +- **Subclass conflicts**: Parent and child class handlers + +### 3. Advanced Type Handling -Executors can declare multiple output types: +Supports Union types, generics, and subclass matching: ```python -@output_message_types(ProcessedData, ErrorReport, None) -class DataProcessor(Executor[RawData]): - async def _execute(self, data: RawData, ctx: ExecutorContext): - try: - result = process(data) - await ctx.send_message(ProcessedData(result)) - except Exception as e: - await ctx.send_message(ErrorReport(str(e))) +@output_message_types(ProcessedData, ErrorReport) +class DataProcessor(Executor): + @handles_message + async def process_data(self, data: Union[TextData, ImageData], ctx: WorkflowContext): + if isinstance(data, TextData): + result = process_text(data) + else: + result = process_image(data) + await ctx.send_message(ProcessedData(result)) + + @handles_message + async def handle_batch(self, items: list[Any], ctx: WorkflowContext): + # Handles any list type (list[str], list[int], etc.) + for item in items: + await ctx.send_message(ProcessedData(item)) ``` -### 4. Streaming Support +### 4. Type-Safe Message Routing -Built-in support for streaming responses: +Automatic routing based on runtime type checking: ```python -class StreamingAgentExecutor(Executor[str]): - async def _execute(self, prompt: str, ctx: ExecutorContext): - async for chunk in self.agent.stream(prompt): - await ctx.add_event(AgentRunStreamingEvent(self.id, chunk)) - - final_response = await self.agent.get_final() - await ctx.send_message(final_response) +def _get_handler_for_type(self, message_type: type) -> Callable | None: + """Find appropriate handler for message type""" + # Direct type match + if message_type in self._message_handlers: + return self._message_handlers[message_type] + + # Generic type matching (list matches list[str]) + for handler_type, handler in self._message_handlers.items(): + if hasattr(handler_type, '__origin__') and hasattr(message_type, '__origin__'): + if handler_type.__origin__ == message_type.__origin__: + return handler + + # Subclass matching + for handler_type, handler in self._message_handlers.items(): + if isinstance(handler_type, type) and issubclass(message_type, handler_type): + return handler + + return None ``` ## Security Considerations @@ -778,51 +1025,87 @@ class StreamingAgentExecutor(Executor[str]): - Message batching for high-throughput scenarios - Executor pooling for stateless processors -## Future Enhancements +## Current Advanced Features + +### 1. Polymorphic Executors -### 1. Checkpointing and Recovery +Single executors handling multiple message types with automatic routing: ```python -class CheckpointProvider(Protocol): - async def save_checkpoint(self, workflow_id: str, state: WorkflowState) -> str - async def load_checkpoint(self, checkpoint_id: str) -> WorkflowState - async def list_checkpoints(self, workflow_id: str) -> list[CheckpointInfo] +@output_message_types(ResponseA, ResponseB, ResponseC) +class MultiServiceExecutor(Executor): + @handles_message + async def handle_query_a(self, query: QueryA, ctx: WorkflowContext): + response = ResponseA(result=self.process_a(query)) + await ctx.send_message(response) + + @handles_message + async def handle_query_b(self, query: QueryB, ctx: WorkflowContext): + response = ResponseB(result=self.process_b(query)) + await ctx.send_message(response) + + @handles_message + async def handle_batch(self, queries: list[Union[QueryA, QueryB]], ctx: WorkflowContext): + for query in queries: + # Automatically routes to appropriate handler + await self.execute(query, ctx) ``` -### 2. Distributed Execution +### 2. Request Correlation and External Integration -- Support for executor distribution across nodes -- Message passing via message queues -- Distributed shared state with consistency guarantees +Seamless external API and human-in-the-loop integration: + +```python +# Built-in correlation with unique IDs +request_id = str(uuid.uuid4()) +await ctx.set_shared_state(f"request:{request_id}", data) + +# Events emitted for external handling +await ctx.add_event(RequestInfoEvent( + request_id=request_id, + request_type=type(data).__name__, + request_data=data +)) + +# Clean response API +async for event in workflow.send_response(user_input, request_id): + # Process response continuation +``` + +### 3. Type Safety and Validation + +Comprehensive type checking and conflict prevention: + +- **Handler Discovery**: Automatic type extraction from method signatures +- **Conflict Detection**: Prevents overlapping Union types and duplicate handlers +- **Runtime Validation**: Type-aware message routing with subclass support +- **Generic Type Support**: Handles `list[T]`, `dict[K,V]`, `Union[A,B]` patterns + +### 4. Comprehensive Observability + +Built-in observability and debugging capabilities: + +- **Event Streaming**: Real-time workflow execution events +- **Request Tracking**: Correlation IDs for request/response patterns +- **Type Validation**: Clear error messages for handler conflicts +- **Execution Flow**: Detailed executor invoke/complete events + +## Future Enhancements -### 3. Advanced Patterns +### 1. Distributed Execution -- Sub-workflow composition -- Dynamic executor instantiation -- Recursive workflow structures -- Time-based triggers and delays +- Support for executor distribution across nodes +- Message passing via message queues +- Distributed shared state with consistency guarantees -### 4. Observability Enhancements +### 2. Enhanced Observability - OpenTelemetry integration - Structured logging with correlation IDs - Performance metrics and profiling - Visual workflow debugging tools -### 5. Template System - -```python -# Planned template system for reusable patterns -template = WorkflowTemplate("map_reduce") - .with_parameter("mapper_count", type=int, default=3) - .with_parameter("reducer_count", type=int, default=2) - .with_pattern(MapReducePattern()) - .build() - -workflow = template.instantiate(mapper_count=5, reducer_count=3) -``` - -### 6. Error Handling and Retry +### 3. Advanced Error Handling - Configurable retry policies per executor - Dead letter queues for failed messages @@ -831,6 +1114,14 @@ workflow = template.instantiate(mapper_count=5, reducer_count=3) ## Conclusion -The Semantic Kernel Workflow Framework provides a robust foundation for building complex AI-powered workflows. Its type-safe, event-driven architecture combined with flexible execution patterns makes it suitable for a wide range of applications from simple sequential processing to complex multi-agent orchestrations with human oversight. +The Agent Framework Workflow system provides a powerful, type-safe foundation for building complex AI-powered workflows. Its multi-handler executor pattern, built-in request/response support, and comprehensive type validation make it suitable for a wide range of applications from simple sequential processing to complex multi-agent orchestrations with external integrations. + +**Key Strengths:** + +- **Polymorphic Design**: Single executors handle multiple message types with automatic routing +- **Type Safety**: Comprehensive validation prevents runtime conflicts and ensures correct message flow +- **External Integration**: Built-in request/response correlation for APIs and human-in-the-loop workflows +- **Developer Experience**: Clean, intuitive API with extensive validation and helpful error reporting +- **Extensibility**: Easy to add new handler types and message patterns -The framework's design prioritizes developer experience through its fluent API while maintaining the flexibility needed for advanced use cases. As the framework evolves, planned enhancements around distributed execution, checkpointing, and advanced patterns will further expand its capabilities while maintaining the core principles of type safety, observability, and extensibility. +The framework's evolution from single-type executors to polymorphic handlers represents a significant advancement in workflow orchestration, enabling more natural and maintainable multi-agent system architectures while preserving the benefits of strong typing and predictable execution patterns. From dfd501203a74440c13f7bc38597a5cbf2bff3502 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Tue, 5 Aug 2025 16:39:45 -0700 Subject: [PATCH 05/11] clean up --- docs/design/workflows_updated.md | 932 ++++--------------------------- 1 file changed, 108 insertions(+), 824 deletions(-) diff --git a/docs/design/workflows_updated.md b/docs/design/workflows_updated.md index 1d3c216cdb..1ba6ae9112 100644 --- a/docs/design/workflows_updated.md +++ b/docs/design/workflows_updated.md @@ -2,86 +2,34 @@ ## Table of Contents -1. [Executive Summary](#executive-summary) -2. [Introduction](#introduction) -3. [Architecture Overview](#architecture-overview) -4. [Core Components](#core-components) +1. [Introduction](#introduction) +2. [Core Components](#core-components) +3. [Foundation Patterns](#foundation-patterns) +4. [Request & Response Pattern](#request--response-pattern) 5. [Execution Model](#execution-model) 6. [API Design](#api-design) -7. [Pattern Implementation](#pattern-implementation) -8. [Event System](#event-system) -9. [State Management](#state-management) -10. [Request/Response Support](#request-response-support) -11. [Advanced Features](#advanced-features) -12. [Security Considerations](#security-considerations) -13. [Performance Considerations](#performance-considerations) -14. [Future Enhancements](#future-enhancements) - -## Executive Summary - -The Agent Framework Workflow system is a sophisticated orchestration framework designed to manage complex multi-agent workflows with advanced type safety and polymorphic execution patterns. Built on a graph-based architecture using Pregel-style execution, the framework provides a flexible and extensible foundation for building AI-powered applications with natural multi-handler patterns. - -Key features include: - -- **Multi-handler executors** with `@handles_message` decorator pattern -- **Built-in request/response support** with automatic correlation and external integration -- **Comprehensive type validation** preventing handler conflicts -- **Polymorphic message routing** based on runtime type checking -- **Thread-safe shared state management** with correlation tracking -- **Asynchronous event-driven execution** with comprehensive observability +7. [Security Considerations](#security-considerations) +8. [Future Enhancements](#future-enhancements) +9. [Conclusion](#conclusion) ## Introduction ### Purpose -The Workflow Framework serves as a middle layer between multi-agent orchestrations and the agent runtime, providing developers with a powerful abstraction for building complex AI workflows. It addresses the need for structured, observable, and maintainable agent coordination patterns. - -### Design Goals - -1. **Type Safety**: Enforce strong typing throughout the workflow pipeline -2. **Flexibility**: Support various execution patterns without framework modifications -3. **Observability**: Provide comprehensive event streaming for monitoring and debugging -4. **Extensibility**: Allow custom executors and patterns to be easily integrated -5. **Performance**: Enable concurrent execution where possible -6. **Reliability**: Support checkpointing and fault tolerance (future) - -### Target Use Cases +The Agent Framework Workflow system is a sophisticated orchestration framework designed to manage complex multi-agent workflows with advanced type safety and polymorphic execution patterns. Built on a graph-based architecture using [Pregel-style](https://kowshik.github.io/JPregel/pregel_paper.pdf) execution, the framework provides a flexible and extensible foundation for building AI-powered applications. -- Multi-agent group chats with various routing strategies -- Map-reduce style data processing pipelines -- Sequential task chains with conditional branching -- Human-in-the-loop decision workflows -- Complex orchestrations requiring state management - -## Architecture Overview +### Architecture Overview The framework follows a graph-based architecture where: -- **Nodes** are represented by `Executor` instances +- **Executors** are processing units that handle messages - **Edges** define the flow of data between executors - **Messages** carry typed data through the graph -- **Events** provide observability into the execution - -``` -┌─────────────┐ Edge ┌─────────────┐ -│ Executor A │──────────────▶│ Executor B │ -└─────────────┘ └─────────────┘ - │ │ - │ ┌─────────────┐ │ - └────────▶│ Executor C │◀────┘ - └─────────────┘ -``` - -### Key Architectural Decisions - -1. **Executor-Based Design**: Each processing unit is an executor with strongly typed input/output -2. **Pregel-Style Execution**: Superstep-based execution model for predictable behavior -3. **Event-Driven Communication**: Asynchronous message passing between executors -4. **Shared State Management**: Thread-safe shared state with atomic operations +- **Events** provide observability into the workflow execution ## Core Components -The workflow framework consists of six core components that work together to create a flexible, type-safe execution environment: +The workflow framework consists of three core layers that work together to create a flexible, type-safe execution environment: ```txt ┌───────────────────────────────────────────────────────────────────┐ @@ -104,82 +52,35 @@ The workflow framework consists of six core components that work together to cre └─────────────────┴───────────────┴─────────────────────────────────┘ ``` -### Component Overview - -1. **Executors**: The processing units that handle messages -2. **Edges**: Define message flow and routing rules between executors -3. **Workflow**: Orchestrates execution and manages the lifecycle -4. **WorkflowContext**: Provides shared state and event management -5. **Message Handlers**: Enable polymorphic message processing -6. **RequestInfoExecutor**: Built-in support for external integrations - -### 0. Message Handler Pattern - -The `@handles_message` decorator transforms executors into polymorphic processors: - -``` -┌─────────────────────────┐ ┌─────────────────────────┐ -│ Traditional Way │ │ Multi-Handler Way │ -├─────────────────────────┤ ├─────────────────────────┤ -│ │ │ @handles_message │ -│ class MyExecutor: │ │ async def handle_typeA()│ -│ def can_handle(): │ ───► │ │ -│ # Complex logic │ │ @handles_message │ -│ def _execute(): │ │ async def handle_typeB()│ -│ # Big switch/if │ │ │ -│ │ │ # Automatic routing! │ -└─────────────────────────┘ └─────────────────────────┘ -``` - -**How it works:** -- Decorates methods to mark them as message handlers -- Automatically extracts the expected message type from method signature -- Framework discovers all handlers during executor initialization -- Routes messages to appropriate handler based on runtime type - -### 1. Executor - The Processing Unit +### 1. Executor Executors are the fundamental building blocks that process messages in a workflow: ``` -┌────────────────────────────────────────┐ -│ Executor │ -├────────────────────────────────────────┤ -│ ID: "data_processor" │ -├────────────────────────────────────────┤ -│ Message Handlers: │ +┌─────────────────────────────────────────┐ +│ Executor │ +├─────────────────────────────────────────┤ +│ ID: "data_processor" │ +├─────────────────────────────────────────┤ +│ Message Handlers: │ │ • handle_text(TextData) → ProcessedText│ -│ • handle_image(ImageData) → Thumbnail │ -│ • handle_batch(List[Any]) → Report │ -├────────────────────────────────────────┤ -│ Lifecycle: │ -│ 1. Receive message │ -│ 2. Match type to handler │ -│ 3. Execute handler │ -│ 4. Send output messages │ -│ 5. Emit events │ -└────────────────────────────────────────┘ +│ • handle_image(ImageData) → Thumbnail │ +└─────────────────────────────────────────┘ ``` -**Key Concepts:** - -- **Identity**: Each executor has a unique ID for routing and debugging -- **Polymorphic**: Can handle multiple message types via different handlers -- **Type-Safe**: Validates message types before processing -- **Event Emitting**: Broadcasts lifecycle events for observability -- **Stateless**: Designed to be stateless (state managed via WorkflowContext) +Messages will be automatically routed to the appropriate handler based on their type. An executor cannot have multiple handlers for the same message type, ensuring type safety and clarity in message processing. -### 2. Edge - The Message Highway +### 2. Edge Edges define how messages flow between executors: ``` ┌─────────────┐ ┌─────────────┐ │ Executor A │ │ Executor B │ -│ │ Edge Rules: │ │ -│ Output: │ 1. Type Check │ Input: │ -│ UserData │ 2. Condition? │ UserData │ -│ │ 3. Route Message │ │ +│ │ │ │ +│ Output: │ │ Input: │ +│ UserData │ │ UserData │ +│ │ │ │ └──────┬──────┘ └──────▲──────┘ │ │ │ ┌──────────────┐ │ @@ -190,15 +91,7 @@ Edges define how messages flow between executors: └──────────────┘ ``` -**Edge Capabilities:** - -- **Type Filtering**: Only routes messages the target can handle -- **Conditional Logic**: Optional conditions for dynamic routing -- **Fan-out Support**: One source can connect to multiple targets -- **Fan-in Support**: Multiple sources can connect to one target -- **Edge Groups**: Coordinate message collection for fan-in patterns - -### 3. Workflow - The Orchestrator +### 3. Workflow The Workflow ties everything together and manages execution: @@ -207,114 +100,75 @@ The Workflow ties everything together and manages execution: │ Workflow │ ├─────────────────────────────────────────────────────┤ │ Components: │ -│ • Executors: [A, B, C, RequestInfo*] │ -│ • Edges: [A→B, A→C, B→C] │ +│ • Executors: [A, B, C, D] │ +│ • Edges: [A→B, A→C, B→D, C→D] │ │ • Start: A │ │ • Runner: Pregel-style superstep execution │ ├─────────────────────────────────────────────────────┤ │ Execution Flow: │ -│ 1. run_stream(message) ──► Start at executor A │ +│ 1. run_streaming(message) ──► Start at executor A │ │ 2. Superstep 1: A processes, sends to B & C │ │ 3. Superstep 2: B & C process in parallel │ -│ 4. Stream events throughout execution │ -│ 5. Complete when no messages remain │ +│ 4. Superstep 3: D processes results from B & C │ +│ 5. Stream events throughout execution │ +│ 6. Complete when no messages remain │ └─────────────────────────────────────────────────────┘ - * RequestInfo executor added automatically ``` -**Workflow Responsibilities:** - -- **Graph Management**: Maintains the executor graph structure -- **Execution Control**: Initiates and monitors workflow runs -- **Event Streaming**: Provides real-time execution visibility -- **Request/Response**: Built-in support for external integrations -- **Automatic Enhancement**: Adds RequestInfoExecutor if not present - -### 4. WorkflowBuilder - The Construction API - -Provides a fluent interface for building workflows: +## Foundation Patterns ``` ┌─────────────────────────────────────────────────────┐ -│ WorkflowBuilder Patterns │ -├─────────────────────────────────────────────────────┤ +│(1) Direct-messaging: A ──► B │ +│ │ +│ .add_edge(A, B) │ +│ │ +│(2) Sequential: A ──► B ──► C │ │ │ -│ Sequential: A ──► B ──► C │ │ .add_chain([A, B, C]) │ +│ (*Cycles are not allowed in a chain) │ │ │ -│ Fan-out: ┌──► B │ +│(3) Fan-out: ┌──► B │ │ A ─┼──► C │ │ └──► D │ │ .add_fan_out_edges(A, [B, C, D]) │ +│ (*Messages from A are sent to all B, C, D) │ │ │ -│ Conditional: ┌─[if x>0]─► B │ +│(4) Conditional: ┌─[if x>0]─► B │ │ A ─┤ │ │ └─[if x<0]─► C │ │ .add_edge(A, B, lambda x: x > 0) │ │ .add_edge(A, C, lambda x: x < 0) │ │ │ -│ Fan-in: A ─┐ │ +│(5) Fan-in: A ─┐ │ │ B ─┼──► D │ │ C ─┘ │ │ .add_fan_in_edges([A, B, C], D) │ +│ (*Messages from A, B, C are collected and │ +│ sent to D as a list when all are ready) │ └─────────────────────────────────────────────────────┘ ``` -### 5. WorkflowContext - The Shared Environment - -Provides executors with access to shared state and event emission: - -``` -┌─────────────────────────────────────────────────────┐ -│ WorkflowContext │ -├─────────────────────────────────────────────────────┤ -│ │ -│ Shared State: Event Stream: │ -│ ┌─────────────┐ ┌──────────────────┐ │ -│ │ counter: 42 │ │ ExecutorInvoke │ │ -│ │ user: {...} │ │ ExecutorComplete │ │ -│ │ cache: [...]│ │ RequestInfo │ │ -│ └─────────────┘ │ WorkflowComplete │ │ -│ └──────────────────┘ │ -│ │ -│ Methods: │ -│ • send_message(msg) - Route to next executors │ -│ • add_event(event) - Emit to event stream │ -│ • get/set_shared_state(key, value) - Share data │ -└─────────────────────────────────────────────────────┘ -``` - -**Context Capabilities:** - -- **Message Routing**: Send messages that flow along edges -- **State Management**: Thread-safe shared state between executors -- **Event Broadcasting**: Emit events for external monitoring -- **Request Correlation**: Track request/response pairs - -### 6. RequestInfoExecutor - The External Gateway +## Request & Response Pattern A special built-in executor for handling external interactions: ``` ┌─────────────────────────────────────────────────────┐ -│ RequestInfoExecutor │ -│ (ID: "request_info") │ -├─────────────────────────────────────────────────────┤ -│ │ -│ Workflow External World │ +│ Executor A External World │ │ │ ▲ │ -│ │ UserApprovalRequest │ │ +│ │ Request │ │ │ ▼ │ │ -│ ┌──────────────┐ RequestInfoEvent │ │ -│ │ RequestInfo │ ─────────────────────►│ │ -│ │ Executor │ (request_id: 123)│ │ +│ ┌──────────────┐ RequestInfoEvent │ │ +│ │ RequestInfo │ ─────────────────────► │ │ +│ │ Executor │ (request_id: 123) │ │ │ │ │ │ │ │ │ │◄───────────────────────┘ │ │ └──────────────┘ send_response(true, 123) │ │ │ │ -│ │ true │ +│ │ Response │ │ ▼ │ -│ Continue... │ +│ Executor A │ └─────────────────────────────────────────────────────┘ ``` @@ -327,6 +181,7 @@ A special built-in executor for handling external interactions: 5. **Continues Flow**: Response routed back through workflow edges **Use Cases:** + - Human approval workflows - External API calls - Database lookups @@ -334,23 +189,6 @@ A special built-in executor for handling external interactions: ## Execution Model -### Data Flow Architecture - -The workflow framework implements a type-safe message passing system with the following key principles: - -#### Executor Type System -- **Multiple Input Types**: Executors can handle multiple types via `@handles_message` decorated methods -- **Handler Discovery**: Types automatically detected from method signatures -- **Type Validation**: Messages routed to appropriate handlers, raises error if no handler found -- **Conflict Prevention**: Validation prevents multiple handlers for same type - -#### Edge Routing Logic -Messages flow along edges based on a two-step validation: -1. **Type Compatibility**: Target executor must be able to handle the message type -2. **Condition Check**: Optional edge condition (if present) must evaluate to true - -If either check fails, the message is ignored for that edge. - ### Pregel-Style Supersteps The framework uses a modified Pregel execution model with clear data flow semantics: @@ -358,217 +196,59 @@ The framework uses a modified Pregel execution model with clear data flow semant ``` Superstep N: ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ -│ Collect All │───▶│ Route Messages │───▶│ Execute All │ +│ Collect All │───▶│ Route Messages │───▶│ Execute All │ │ Pending │ │ Based on Type │ │ Target │ │ Messages │ │ & Conditions │ │ Executors │ └─────────────────┘ └─────────────────┘ └─────────────────┘ - │ -┌─────────────────┐ ┌─────────────────┐ │ + │ +┌─────────────────┐ ┌─────────────────┐ │ │ Start Next │◀───│ Emit Events & │◀────────────┘ │ Superstep │ │ New Messages │ └─────────────────┘ └─────────────────┘ ``` -### Message Delivery Patterns - -#### 1. Direct Routing (1:1) -``` -┌─────────────┐ message ┌─────────────┐ -│ Executor A │──────────────▶│ Executor B │ -│ Output: T │ │ Input: T │ -└─────────────┘ └─────────────┘ -``` - -**Sequence:** -```mermaid -sequenceDiagram - participant A as Executor A - participant Edge as Edge A→B - participant B as Executor B - - A->>Edge: send_message(data: T) - Edge->>B: can_handle(data: T)? - B-->>Edge: true - Edge->>B: execute(data: T, ctx) - B->>Edge: result -``` - -#### 2. Fan-out (1:N) -``` - ┌─────────────┐ - ┌───▶│ Executor B │ -┌────────────┤ │ Input: T │ -│ Executor A │ └─────────────┘ -│ Output: T │ -└────────────┤ ┌─────────────┐ - └───▶│ Executor C │ - │ Input: T │ - └─────────────┘ -``` - -**Sequence:** -```mermaid -sequenceDiagram - participant A as Executor A - participant B as Executor B - participant C as Executor C - - A->>A: handler method produces message - par Send to B - A->>B: send_message(data) - and Send to C - A->>C: send_message(data) - end - par Execute concurrently - B->>B: handle message via @handles_message - and - C->>C: handle message via @handles_message - end -``` - -#### 3. Fan-in Patterns (N:1) - -##### 3a. Fan-in (Message Collection) -``` -┌─────────────┐ -│ Executor A │───┐ -│ Output: T₁ │ │ ┌─────────────────┐ -└─────────────┘ │ │ │ - ├──▶│ Executor D │ -┌─────────────┐ │ │ Input: [T₁,T₂] │ -│ Executor B │───┘ │ │ -│ Output: T₂ │ └─────────────────┘ -└─────────────┘ -``` - -**Key Behavior**: When multiple executors connect to a single target executor that expects a list type, messages are collected and delivered as a list to the target executor's handler method. - -**Sequence:** -```mermaid -sequenceDiagram - participant A as Executor A - participant B as Executor B - participant SS as Shared State - participant D as Executor D - - A->>SS: store message from A - Note over SS: Wait for all configured edges - B->>SS: store message from B - Note over SS: All configured edges contributed - SS->>D: execute([msg_A, msg_B]) - D->>D: handle list via @handles_message -``` +- **Superstep Isolation**: All executors in a superstep run concurrently +- **Message Delivery**: Parallel delivery to all matching edges -##### 3b. Fan-in (Individual Message Processing) -``` -┌─────────────┐ -│ Executor A │───┐ -│ Output: T │ │ ┌─────────────────┐ -└─────────────┘ │ │ │ - ├──▶│ Executor D │ -┌─────────────┐ │ │ Input: T │ -│ Executor B │───┘ │ │ -│ Output: T │ └─────────────────┘ -└─────────────┘ -``` +#### Check Points -**Key Behavior**: Each message triggers the executor independently. No accumulation occurs. If multiple messages arrive in the same superstep, the executor runs multiple times. - -**Sequence:** -```mermaid -sequenceDiagram - participant A as Executor A - participant B as Executor B - participant D as Executor D - - par Independent execution - A->>D: execute(msg_A) - D->>D: handle msg_A via @handles_message - and - B->>D: execute(msg_B) - D->>D: handle msg_B via @handles_message - end - Note over D: Executor runs twice -``` +Checkpoints can be saved between superstep boundaries to allow for recovery in case of failures. The following information will be stored: -#### 4. Conditional Routing -``` - condition=λx: x.priority=="high" - ┌──────────────────────────────────┐ -┌────────┤ ▼ -│Router │ ┌─────────────────┐ -│ │ │ High Priority │ -│ │ │ Handler │ -└────────┤ └─────────────────┘ - │ condition=λx: x.priority=="low" - └──────────────────────────────────┐ - ▼ - ┌─────────────────┐ - │ Low Priority │ - │ Handler │ - └─────────────────┘ -``` +- Executor states +- Workflow states +- Pending messages -**Sequence:** -```mermaid -sequenceDiagram - participant Router - participant HighHandler as High Priority Handler - participant LowHandler as Low Priority Handler - - Router->>Router: handler method produces message - alt message.priority == "high" - Router->>HighHandler: send_message(high_priority_msg) - HighHandler->>HighHandler: handle message via @handles_message - else message.priority == "low" - Router->>LowHandler: send_message(low_priority_msg) - LowHandler->>LowHandler: handle message via @handles_message - else no condition matches - Note over Router: Message ignored - end -``` +A checkpoint will be automatically created when the workflow converges (no more messages to process). A workflow can be resumed from a checkpoint, allowing for fault tolerance and long-running workflows. -### Concurrency Model +#### Transactions (Planned) -- **Superstep Isolation**: All executors in a superstep run concurrently -- **Message Delivery**: Parallel delivery to all matching edges -- **Shared State**: Thread-safe access with atomic operations -- **Event Collection**: Lock-free event streaming +If an executor fails during a superstep, messages processed during that superstep will be rolled back. Updates to shared state will not be committed. This ensures that the workflow remains consistent and avoids partial updates. ## API Design -### Creating Single-Handler Executors +### Creating Executors ```python -@output_message_types(str) -class UpperCaseExecutor(Executor): - @handles_message - async def process_text(self, data: str, ctx: WorkflowContext) -> None: - result = data.upper() - await ctx.send_message(result) -``` +class SampleExecutor(Executor): -### Creating Multi-Handler Executors + @handles_message(output_types=[str]) + async def reverse_string(self, data: str, ctx: WorkflowContext) -> None: + """Handler that handles a string and sends a string.""" + await ctx.send_message(data[::-1]) -```python -@output_message_types(TemperatureResponse, WeatherResponse) -class WeatherService(Executor): - """Service handling multiple weather query types""" + @handles_message(output_types=[int, float]) + async def handle_int(self, data: int, ctx: WorkflowContext) -> None: + """Handler that handles an integer and sends an integer and a float.""" + await ctx.send_message(int(data * 2)) + await ctx.send_message(float(data / 2)) @handles_message - async def get_temperature(self, query: TemperatureQuery, ctx: WorkflowContext) -> None: - response = TemperatureResponse(location=query.location, temperature=20.0) - await ctx.send_message(response) - - @handles_message - async def get_weather(self, query: WeatherQuery, ctx: WorkflowContext) -> None: - response = WeatherResponse( - location=query.location, temperature=20.0, conditions="Sunny" - ) - await ctx.send_message(response) + async def handle(self, data: str, ctx: WorkflowContext) -> None: + """Handler that handles a string and emits an event.""" + await ctx.add_event(WorkflowCompletedEvent(data)) ``` -### Building Workflows +### Building Workflows with a WorkflowBuilder ```python # Sequential workflow @@ -598,150 +278,28 @@ workflow = ( ) ``` -### Running Workflows - -```python -# Stream execution with event handling -async for event in workflow.run_stream(initial_message): - if isinstance(event, ExecutorCompleteEvent): - print(f"Executor {event.executor_id} completed") - elif isinstance(event, RequestInfoEvent): - # Handle request/response pattern - user_response = await get_user_input(event.request_data) - async for event in workflow.send_response(user_response, event.request_id): - # Process response continuation events -``` - -## Pattern Implementation - -### 1. Multi-Handler Pattern - -Single executor handling multiple message types. - -```python -@output_message_types(ProcessedDataA, ProcessedDataB) -class PolymorphicProcessor(Executor): - @handles_message - async def handle_type_a(self, data: InputTypeA, ctx: WorkflowContext): - result = ProcessedDataA(data.value * 2) - await ctx.send_message(result) - - @handles_message - async def handle_type_b(self, data: InputTypeB, ctx: WorkflowContext): - result = ProcessedDataB(data.text.upper()) - await ctx.send_message(result) -``` - -### 2. Request/Response Pattern - -Built-in support for external request handling. - -```python -@dataclass -class UserQuery(RequestMessage): - question: str - -# Workflow automatically includes RequestInfoExecutor -workflow = WorkflowBuilder().set_start_executor(processor).build() - -# Handle request/response externally -async for event in workflow.run_stream(UserQuery("What is AI?")): - if isinstance(event, RequestInfoEvent): - response = await get_human_response(event.request_data.question) - async for event in workflow.send_response(response, event.request_id): - # Process response... -``` - -### 3. Sequential Processing - -Simple chain of executors processing data in order. - -```python -workflow = ( - WorkflowBuilder() - .add_chain([preprocessor, analyzer, formatter]) - .set_start_executor(preprocessor) - .build() -) -``` - -### 4. Round-Robin Group Chat - -Multi-agent coordination with conditional routing. - -```python -@output_message_types(TurnSelection) -class GroupChatManager(Executor): - @handles_message - async def coordinate_chat(self, messages: list[ChatMessage], ctx: WorkflowContext): - next_agent = self._agents[self._turn_count % len(self._agents)] - selection = TurnSelection(messages=messages, selected_agent=next_agent) - await ctx.send_message(selection) - -@output_message_types(list[ChatMessage]) -class ChatAgent(Executor): - @handles_message - async def respond_when_selected(self, selection: TurnSelection, ctx: WorkflowContext): - if selection.selected_agent == self._name: - response = ChatMessage(text=f"This is {self._name} speaking!") - await ctx.send_message([response]) - -# Conditional routing based on agent selection -workflow = ( - WorkflowBuilder() - .set_start_executor(manager) - .add_edge(manager, alice, - condition=lambda sel: sel.selected_agent == "alice") - .add_edge(manager, bob, - condition=lambda sel: sel.selected_agent == "bob") - .add_edge(alice, manager) - .add_edge(bob, manager) - .build() -) -``` - -### 5. Map-Reduce Pattern - -Parallel processing with aggregation using fan-out/fan-in. +### Workflow Validation -```python -# Split -> Map (parallel) -> Reduce -> Aggregate -workflow = ( - WorkflowBuilder() - .set_start_executor(splitter) - .add_fan_out_edges(splitter, mappers) # Parallel processing - .add_fan_in_edges(mappers, aggregator) - .build() -) -``` +Upon building the workflow, the framework performs comprehensive validation: -### 6. Conditional Branching +- **EDGE_DUPLICATION**: Checks for duplicate edges between the same pair of executors. +- **TYPE_COMPATIBILITY**: Ensures that message types are compatible between connected executors. This is done by checking the output types of the source executor against the input types of the target executor using type annotations and type information added by the decorator. +- **GRAPH_CONNECTIVITY**: Ensures that all executors are reachable from the start executor. -Dynamic routing based on message content. +### Running Workflows ```python -@output_message_types(EmailMessage) -class SpamDetector(Executor): - @handles_message - async def analyze_email(self, content: str, ctx: WorkflowContext): - is_spam = any(keyword in content.lower() for keyword in self._spam_keywords) - await ctx.send_message(EmailMessage(content=content, is_spam=is_spam)) +# Streaming +async for event in workflow.run_streaming(initial_message): + if isinstance(event, WorkflowCompletedEvent): + print(f"Workflow completed with result: {event.data}") -# Conditional routing based on analysis results -workflow = ( - WorkflowBuilder() - .set_start_executor(detector) - .add_edge(detector, responder, - condition=lambda email: not email.is_spam) - .add_edge(detector, spam_filter, - condition=lambda email: email.is_spam) - .build() -) +# Non-streaming +result = await workflow.run(initial_message) +print(f"Workflow completed with result: {result.get_completed_event().data}") ``` -## Event System - -### Event Types +### Built-in Event Types ```python # Workflow lifecycle events @@ -752,67 +310,14 @@ WorkflowCompletedEvent # Workflow reaches completion ExecutorInvokeEvent # Executor starts processing ExecutorCompleteEvent # Executor finishes processing -# Agent-specific events -AgentRunEvent # Agent produces final response -AgentRunStreamingEvent # Agent streams partial response - # Request/Response events RequestInfoEvent # Request received with correlation ID ``` -### RequestInfoEvent Structure - -```python -@dataclass -class RequestInfoEvent(WorkflowEvent): - """Event emitted when RequestInfoExecutor processes a request""" - request_id: str # Unique correlation ID - source_executor_id: str # ID of the executor that sent the request - request_type: str # Type name of request message - request_data: RequestMessage # The actual request data -``` - -### Event Handling - -Events are emitted during execution and streamed to consumers: - -```python -async for event in workflow.run_stream(message): - match event: - case ExecutorCompleteEvent(executor_id=id): - logger.info(f"Executor {id} completed") - case RequestInfoEvent(request_id=rid, request_data=data): - # Handle external request - response = await get_external_response(data) - # Resume with response - async for event in workflow.send_response(response, rid): - # Process continuation - case WorkflowCompletedEvent(data=final_result): - return final_result -``` - -## State Management - -### Shared State +### State Management Thread-safe key-value store accessible to all executors. -```python -class _SharedState: - """Thread-safe shared state management""" - - async def set(self, key: str, value: Any) -> None - async def get(self, key: str) -> Any - async def has(self, key: str) -> bool - async def delete(self, key: str) -> None - - @asynccontextmanager - async def hold(self): - """Hold lock for multiple operations""" -``` - -### Usage in Executors - ```python class StatefulExecutor(Executor): @handles_message @@ -830,147 +335,19 @@ class StatefulExecutor(Executor): await ctx.set_shared_state("combined", value1 + value2) ``` -## Request/Response Support - -### Design Approach - -The framework provides built-in request/response patterns through: - -1. **RequestMessage Base Class**: All requests inherit from `RequestMessage` -2. **RequestInfoExecutor**: Automatically added to workflows -3. **Request Correlation**: Unique IDs for tracking request/response pairs -4. **External Integration**: Clean API for external response handling - -### RequestMessage Pattern - -```python -@dataclass -class UserApprovalRequest(RequestMessage): - """Request requiring human approval""" - action: str - details: str - risk_level: str - -@output_message_types() -class ApprovalProcessor(Executor): - @handles_message - async def process_approval(self, response: bool, ctx: WorkflowContext): - if response: - await ctx.add_event(WorkflowCompletedEvent("Action approved")) - else: - await ctx.add_event(WorkflowCompletedEvent("Action rejected")) -``` - -### Integration Pattern +### Request & Response ```python -# Workflow automatically includes RequestInfoExecutor -workflow = WorkflowBuilder().set_start_executor(processor).build() - -# Request/response handling -async for event in workflow.run_stream(UserApprovalRequest( - action="delete_files", details="Remove temp files", risk_level="low" -)): +request_info_event: RequestInfoEvent | None = None +async for event in workflow.run_streaming(initial_message): if isinstance(event, RequestInfoEvent): - # Handle request externally - user_decision = await prompt_user( - f"Approve {event.request_data.action}? (y/n)" - ) - approval = user_decision.lower() == 'y' - - # Send response back to workflow - async for event in workflow.send_response(approval, event.request_id): - if isinstance(event, WorkflowCompletedEvent): - print(f"Result: {event.data}") - break - break -``` - -## Advanced Features - -### 1. Edge Groups and Synchronization - -Fan-in edges can be grouped for synchronized delivery: + request_info_event = event -```python -# All three workers must complete before aggregator runs -workflow = ( - WorkflowBuilder() - .add_fan_in_edges([worker1, worker2, worker3], aggregator) - .build() -) -``` - -### 2. Handler Conflict Validation - -Prevents runtime errors through comprehensive validation: - -```python -def _validate_handler_conflicts(self, handlers: list[tuple[str, Any, type]]) -> None: - """Validate no conflicting message handlers exist""" - type_to_handler = {} - for method_name, method, message_type in handlers: - expanded_types = self._expand_type(message_type) - for expanded_type in expanded_types: - if expanded_type in type_to_handler: - existing = type_to_handler[expanded_type] - raise ValueError( - f"Conflicting message handlers: {existing} and {method_name} " - f"both handle {expanded_type}" - ) - type_to_handler[expanded_type] = method_name -``` - -**Conflict Detection:** -- **Direct conflicts**: Two handlers for same type -- **Union conflicts**: `Union[A, B]` vs separate `A` handler -- **Subclass conflicts**: Parent and child class handlers - -### 3. Advanced Type Handling +async for event in workflow.send_responses_stream({request_info_event.request_id: "response_data"}): + if isinstance(event, WorkflowCompletedEvent): + result = event.data -Supports Union types, generics, and subclass matching: - -```python -@output_message_types(ProcessedData, ErrorReport) -class DataProcessor(Executor): - @handles_message - async def process_data(self, data: Union[TextData, ImageData], ctx: WorkflowContext): - if isinstance(data, TextData): - result = process_text(data) - else: - result = process_image(data) - await ctx.send_message(ProcessedData(result)) - - @handles_message - async def handle_batch(self, items: list[Any], ctx: WorkflowContext): - # Handles any list type (list[str], list[int], etc.) - for item in items: - await ctx.send_message(ProcessedData(item)) -``` - -### 4. Type-Safe Message Routing - -Automatic routing based on runtime type checking: - -```python -def _get_handler_for_type(self, message_type: type) -> Callable | None: - """Find appropriate handler for message type""" - # Direct type match - if message_type in self._message_handlers: - return self._message_handlers[message_type] - - # Generic type matching (list matches list[str]) - for handler_type, handler in self._message_handlers.items(): - if hasattr(handler_type, '__origin__') and hasattr(message_type, '__origin__'): - if handler_type.__origin__ == message_type.__origin__: - return handler - - # Subclass matching - for handler_type, handler in self._message_handlers.items(): - if isinstance(handler_type, type) and issubclass(message_type, handler_type): - return handler - - return None +print(f"Workflow completed with result: {result}") ``` ## Security Considerations @@ -997,98 +374,7 @@ def _get_handler_for_type(self, message_type: type) -> Callable | None: - Maximum iteration count prevents infinite loops - Timeout support for long-running executors (planned) -- Memory usage bounded by message queue size - -## Performance Considerations - -### 1. Concurrency - -- Superstep model enables parallel executor execution -- Message delivery within superstep is concurrent -- Async/await throughout for non-blocking I/O - -### 2. Memory Efficiency - -- Messages are passed by reference where possible -- Event streaming prevents memory accumulation -- Lazy evaluation of conditional edges - -### 3. Scalability - -- O(E + V) complexity per superstep (edges + vertices) -- Linear scaling with number of messages -- Shared state operations are O(1) average case - -### 4. Optimization Opportunities - -- Edge pre-computation for static workflows -- Message batching for high-throughput scenarios -- Executor pooling for stateless processors - -## Current Advanced Features - -### 1. Polymorphic Executors - -Single executors handling multiple message types with automatic routing: - -```python -@output_message_types(ResponseA, ResponseB, ResponseC) -class MultiServiceExecutor(Executor): - @handles_message - async def handle_query_a(self, query: QueryA, ctx: WorkflowContext): - response = ResponseA(result=self.process_a(query)) - await ctx.send_message(response) - - @handles_message - async def handle_query_b(self, query: QueryB, ctx: WorkflowContext): - response = ResponseB(result=self.process_b(query)) - await ctx.send_message(response) - - @handles_message - async def handle_batch(self, queries: list[Union[QueryA, QueryB]], ctx: WorkflowContext): - for query in queries: - # Automatically routes to appropriate handler - await self.execute(query, ctx) -``` - -### 2. Request Correlation and External Integration - -Seamless external API and human-in-the-loop integration: - -```python -# Built-in correlation with unique IDs -request_id = str(uuid.uuid4()) -await ctx.set_shared_state(f"request:{request_id}", data) - -# Events emitted for external handling -await ctx.add_event(RequestInfoEvent( - request_id=request_id, - request_type=type(data).__name__, - request_data=data -)) - -# Clean response API -async for event in workflow.send_response(user_input, request_id): - # Process response continuation -``` - -### 3. Type Safety and Validation - -Comprehensive type checking and conflict prevention: - -- **Handler Discovery**: Automatic type extraction from method signatures -- **Conflict Detection**: Prevents overlapping Union types and duplicate handlers -- **Runtime Validation**: Type-aware message routing with subclass support -- **Generic Type Support**: Handles `list[T]`, `dict[K,V]`, `Union[A,B]` patterns - -### 4. Comprehensive Observability - -Built-in observability and debugging capabilities: - -- **Event Streaming**: Real-time workflow execution events -- **Request Tracking**: Correlation IDs for request/response patterns -- **Type Validation**: Clear error messages for handler conflicts -- **Execution Flow**: Detailed executor invoke/complete events +- Memory usage bounded by message queue size (planned) ## Future Enhancements @@ -1123,5 +409,3 @@ The Agent Framework Workflow system provides a powerful, type-safe foundation fo - **External Integration**: Built-in request/response correlation for APIs and human-in-the-loop workflows - **Developer Experience**: Clean, intuitive API with extensive validation and helpful error reporting - **Extensibility**: Easy to add new handler types and message patterns - -The framework's evolution from single-type executors to polymorphic handlers represents a significant advancement in workflow orchestration, enabling more natural and maintainable multi-agent system architectures while preserving the benefits of strong typing and predictable execution patterns. From a503e1aa2ceccca5c275b26a1b5cf139a1682fac Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Tue, 5 Aug 2025 17:04:47 -0700 Subject: [PATCH 06/11] Clean up request & response contents --- docs/design/workflows_updated.md | 34 ++++++++++++++++---------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/docs/design/workflows_updated.md b/docs/design/workflows_updated.md index 1ba6ae9112..481d84ed77 100644 --- a/docs/design/workflows_updated.md +++ b/docs/design/workflows_updated.md @@ -154,22 +154,22 @@ The Workflow ties everything together and manages execution: A special built-in executor for handling external interactions: ``` -┌─────────────────────────────────────────────────────┐ -│ Executor A External World │ -│ │ ▲ │ -│ │ Request │ │ -│ ▼ │ │ -│ ┌──────────────┐ RequestInfoEvent │ │ -│ │ RequestInfo │ ─────────────────────► │ │ -│ │ Executor │ (request_id: 123) │ │ -│ │ │ │ │ -│ │ │◄───────────────────────┘ │ -│ └──────────────┘ send_response(true, 123) │ -│ │ │ -│ │ Response │ -│ ▼ │ -│ Executor A │ -└─────────────────────────────────────────────────────┘ +┌─────────────────────────────────────────────────────────────┐ +│ Executor A External World │ +│ │ ▲ │ +│ │ Request │ │ +│ ▼ │ │ +│ ┌──────────────┐ RequestInfoEvent │ │ +│ │ RequestInfo │ ─────────────────────► │ │ +│ │ Executor │ (request_id: 123) │ │ +│ │ │ │ │ +│ │ │◄───────────────────────┘ │ +│ └──────────────┘send_responses_streaming({123: response}) │ +│ │ │ +│ │ Response │ +│ ▼ │ +│ Executor A │ +└─────────────────────────────────────────────────────────────┘ ``` **How it works:** @@ -177,7 +177,7 @@ A special built-in executor for handling external interactions: 1. **Intercepts Requests**: Catches any `RequestMessage` subclass 2. **Generates Correlation ID**: Creates unique request_id 3. **Emits Event**: Sends RequestInfoEvent for external handling -4. **Waits for Response**: External system calls `workflow.send_response()` +4. **Waits for Response**: Application sends responses back with the same request_id 5. **Continues Flow**: Response routed back through workflow edges **Use Cases:** From 3f51cf38986b913898d0a9d89f7d10933b42ba17 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Tue, 5 Aug 2025 17:20:12 -0700 Subject: [PATCH 07/11] Add more future enhacements --- docs/design/workflows_updated.md | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/docs/design/workflows_updated.md b/docs/design/workflows_updated.md index 481d84ed77..4e1df339ad 100644 --- a/docs/design/workflows_updated.md +++ b/docs/design/workflows_updated.md @@ -378,20 +378,30 @@ print(f"Workflow completed with result: {result}") ## Future Enhancements -### 1. Distributed Execution +### 1. Templatized Workflows + +- Support for reusable workflow templates +- High-level workflow definitions with parameterization and templatized WorkflowBuilder to allow for easy instantiation of common patterns + +### 2. Declarative Workflow Definitions + +- CSDL (Copilot Studio Definition Language) support for defining workflows +- Visual workflow designer with drag-and-drop interface + +### 3. Crossed-platform & Distributed Execution - Support for executor distribution across nodes - Message passing via message queues - Distributed shared state with consistency guarantees -### 2. Enhanced Observability +### 4. Enhanced Observability - OpenTelemetry integration - Structured logging with correlation IDs - Performance metrics and profiling - Visual workflow debugging tools -### 3. Advanced Error Handling +### 5. Advanced Error Handling - Configurable retry policies per executor - Dead letter queues for failed messages From a9c0c210ab6a8d8af90dd995553d345a882d2b15 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Wed, 6 Aug 2025 08:04:06 -0700 Subject: [PATCH 08/11] message_handler -> handler --- docs/design/workflows_updated.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/design/workflows_updated.md b/docs/design/workflows_updated.md index 4e1df339ad..0c875bf3c4 100644 --- a/docs/design/workflows_updated.md +++ b/docs/design/workflows_updated.md @@ -40,7 +40,7 @@ The workflow framework consists of three core layers that work together to creat │ (Processing) │ (Routing) │ (Orchestration) │ │ │ │ │ │ ┌─────────────┐ │ ┌───────────┐ │ ┌─────────────────────────────┐ │ -│ │@handles_msg │ │ │Conditional│ │ │ • Manages execution flow │ │ +│ │@handler │ │ │Conditional│ │ │ • Manages execution flow │ │ │ │┌───────────┐│ │ │ Routing │ │ │ • Coordinates executors │ │ │ ││Handler A ││ │ └─────┬─────┘ │ │ • Streams events │ │ │ │├───────────┤│ │ │ │ └─────────────┬───────────────┘ │ @@ -231,18 +231,18 @@ If an executor fails during a superstep, messages processed during that superste ```python class SampleExecutor(Executor): - @handles_message(output_types=[str]) + @handler(output_types=[str]) async def reverse_string(self, data: str, ctx: WorkflowContext) -> None: """Handler that handles a string and sends a string.""" await ctx.send_message(data[::-1]) - @handles_message(output_types=[int, float]) + @handler(output_types=[int, float]) async def handle_int(self, data: int, ctx: WorkflowContext) -> None: """Handler that handles an integer and sends an integer and a float.""" await ctx.send_message(int(data * 2)) await ctx.send_message(float(data / 2)) - @handles_message + @handler async def handle(self, data: str, ctx: WorkflowContext) -> None: """Handler that handles a string and emits an event.""" await ctx.add_event(WorkflowCompletedEvent(data)) @@ -320,7 +320,7 @@ Thread-safe key-value store accessible to all executors. ```python class StatefulExecutor(Executor): - @handles_message + @handler async def process_data(self, data: str, ctx: WorkflowContext) -> None: # Read from shared state counter = await ctx.get_shared_state("counter") or 0 From 1526c36b43c4458724160f57b29646ceca542adc Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Wed, 6 Aug 2025 08:34:46 -0700 Subject: [PATCH 09/11] remove mention of visual designer --- docs/design/workflows_updated.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/design/workflows_updated.md b/docs/design/workflows_updated.md index 0c875bf3c4..fe59ecf8a4 100644 --- a/docs/design/workflows_updated.md +++ b/docs/design/workflows_updated.md @@ -386,7 +386,6 @@ print(f"Workflow completed with result: {result}") ### 2. Declarative Workflow Definitions - CSDL (Copilot Studio Definition Language) support for defining workflows -- Visual workflow designer with drag-and-drop interface ### 3. Crossed-platform & Distributed Execution From 093fb5b159e7859883dc2de7f61bd1d8445dc0d3 Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Thu, 14 Aug 2025 13:44:36 -0700 Subject: [PATCH 10/11] Minor updates and rename --- ... agent_framework_workflow_architecture.md} | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) rename docs/design/{workflows_updated.md => agent_framework_workflow_architecture.md} (94%) diff --git a/docs/design/workflows_updated.md b/docs/design/agent_framework_workflow_architecture.md similarity index 94% rename from docs/design/workflows_updated.md rename to docs/design/agent_framework_workflow_architecture.md index fe59ecf8a4..591d9a3f78 100644 --- a/docs/design/workflows_updated.md +++ b/docs/design/agent_framework_workflow_architecture.md @@ -60,7 +60,7 @@ Executors are the fundamental building blocks that process messages in a workflo ┌─────────────────────────────────────────┐ │ Executor │ ├─────────────────────────────────────────┤ -│ ID: "data_processor" │ +│ Name: "data_processor" │ ├─────────────────────────────────────────┤ │ Message Handlers: │ │ • handle_text(TextData) → ProcessedText│ @@ -72,7 +72,7 @@ Messages will be automatically routed to the appropriate handler based on their ### 2. Edge -Edges define how messages flow between executors: +Edges define how messages flow between executors with optional conditions: ``` ┌─────────────┐ ┌─────────────┐ @@ -128,17 +128,26 @@ The Workflow ties everything together and manages execution: │ .add_chain([A, B, C]) │ │ (*Cycles are not allowed in a chain) │ │ │ -│(3) Fan-out: ┌──► B │ +│(3) Fan-out: │ +| ┌──► B │ │ A ─┼──► C │ │ └──► D │ │ .add_fan_out_edges(A, [B, C, D]) │ │ (*Messages from A are sent to all B, C, D) │ +│ (*With an optional selection function, │ +│ messages can be sent to only a subset of │ +│ recipients based on custom logic) │ │ │ -│(4) Conditional: ┌─[if x>0]─► B │ +│(4) Switch-case: ┌─[case: x>0]─► B │ │ A ─┤ │ -│ └─[if x<0]─► C │ -│ .add_edge(A, B, lambda x: x > 0) │ -│ .add_edge(A, C, lambda x: x < 0) │ +│ └─[default]─► C │ +│ .add_switch_case_edge_group( │ +│ source=A, │ +│ case=[ │ +│ Case(B, condition=lambda x: x > 0), │ +│ Default(C), │ +│ ], │ +│ ) │ │ │ │(5) Fan-in: A ─┐ │ │ B ─┼──► D │ @@ -178,7 +187,7 @@ A special built-in executor for handling external interactions: 2. **Generates Correlation ID**: Creates unique request_id 3. **Emits Event**: Sends RequestInfoEvent for external handling 4. **Waits for Response**: Application sends responses back with the same request_id -5. **Continues Flow**: Response routed back through workflow edges +5. **Continues Flow**: Response routed back to the original executor **Use Cases:** @@ -390,7 +399,7 @@ print(f"Workflow completed with result: {result}") ### 3. Crossed-platform & Distributed Execution - Support for executor distribution across nodes -- Message passing via message queues +- Message passing via message queues - Distributed shared state with consistency guarantees ### 4. Enhanced Observability From 9787245756a8b375e50e1f8a8ac18c937dc98c0d Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Thu, 14 Aug 2025 14:43:48 -0700 Subject: [PATCH 11/11] Name reformat --- ...itecture.md => agent-framework-workflow-architecture.md} | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) rename docs/design/{agent_framework_workflow_architecture.md => agent-framework-workflow-architecture.md} (98%) diff --git a/docs/design/agent_framework_workflow_architecture.md b/docs/design/agent-framework-workflow-architecture.md similarity index 98% rename from docs/design/agent_framework_workflow_architecture.md rename to docs/design/agent-framework-workflow-architecture.md index 591d9a3f78..e562a70d81 100644 --- a/docs/design/agent_framework_workflow_architecture.md +++ b/docs/design/agent-framework-workflow-architecture.md @@ -144,7 +144,7 @@ The Workflow ties everything together and manages execution: │ .add_switch_case_edge_group( │ │ source=A, │ │ case=[ │ -│ Case(B, condition=lambda x: x > 0), │ +│ Case(B, condition=lambda x, state: x > 0), │ │ Default(C), │ │ ], │ │ ) │ @@ -271,8 +271,8 @@ workflow = ( # Conditional routing workflow = ( WorkflowBuilder() - .add_edge(router, executor_a, lambda msg: msg.type == "A") - .add_edge(router, executor_b, lambda msg: msg.type == "B") + .add_edge(router, executor_a, lambda msg, workflow_state: msg.type == "A") + .add_edge(router, executor_b, lambda msg, workflow_state: msg.type == "B") .set_start_executor(router) .build() )