From 48181e764ab9cc8be1a19bbd8a7d661f13f60f29 Mon Sep 17 00:00:00 2001 From: Fede Kamelhar Date: Sun, 25 May 2025 14:16:21 -0400 Subject: [PATCH] Modularizing Event Loop --- src/strands/event_loop/event_loop.py | 211 +++++++++++++++++---------- 1 file changed, 134 insertions(+), 77 deletions(-) diff --git a/src/strands/event_loop/event_loop.py b/src/strands/event_loop/event_loop.py index db5a1b972..23d7bd0f3 100644 --- a/src/strands/event_loop/event_loop.py +++ b/src/strands/event_loop/event_loop.py @@ -28,6 +28,10 @@ logger = logging.getLogger(__name__) +MAX_ATTEMPTS = 6 +INITIAL_DELAY = 4 +MAX_DELAY = 240 # 4 minutes + def initialize_state(**kwargs: Any) -> Any: """Initialize the request state if not present. @@ -51,7 +55,7 @@ def event_loop_cycle( system_prompt: Optional[str], messages: Messages, tool_config: Optional[ToolConfig], - callback_handler: Any, + callback_handler: Callable[..., Any], tool_handler: Optional[ToolHandler], tool_execution_handler: Optional[ParallelToolExecutorInterface] = None, **kwargs: Any, @@ -130,13 +134,9 @@ def event_loop_cycle( stop_reason: StopReason usage: Any metrics: Metrics - max_attempts = 6 - initial_delay = 4 - max_delay = 240 # 4 minutes - current_delay = initial_delay # Retry loop for handling throttling exceptions - for attempt in range(max_attempts): + for attempt in range(MAX_ATTEMPTS): model_id = model.config.get("model_id") if hasattr(model, "config") else None model_invoke_span = tracer.start_model_invoke_span( parent_span=cycle_span, @@ -177,7 +177,7 @@ def event_loop_cycle( # Handle throttling errors with exponential backoff should_retry, current_delay = handle_throttling_error( - e, attempt, max_attempts, current_delay, max_delay, callback_handler, kwargs + e, attempt, MAX_ATTEMPTS, INITIAL_DELAY, MAX_DELAY, callback_handler, kwargs ) if should_retry: continue @@ -204,80 +204,35 @@ def event_loop_cycle( # If the model is requesting to use tools if stop_reason == "tool_use": - tool_uses: List[ToolUse] = [] - tool_results: List[ToolResult] = [] - invalid_tool_use_ids: List[str] = [] - - # Extract and validate tools - validate_and_prepare_tools(message, tool_uses, tool_results, invalid_tool_use_ids) - - # Check if tools are available for execution - if tool_uses: - if tool_handler is None: - raise ValueError("toolUse present but tool handler not set") - if tool_config is None: - raise ValueError("toolUse present but tool config not set") - - # Create the tool handler process callable - tool_handler_process: Callable[[ToolUse], ToolResult] = partial( - tool_handler.process, - messages=messages, - model=model, - system_prompt=system_prompt, - tool_config=tool_config, - callback_handler=callback_handler, - **kwargs, + if not tool_handler: + raise EventLoopException( + Exception("Model requested tool use but no tool handler provided"), + kwargs["request_state"], ) - # Execute tools (parallel or sequential) - run_tools( - handler=tool_handler_process, - tool_uses=tool_uses, - event_loop_metrics=event_loop_metrics, - request_state=cast(Any, kwargs["request_state"]), - invalid_tool_use_ids=invalid_tool_use_ids, - tool_results=tool_results, - cycle_trace=cycle_trace, - parent_span=cycle_span, - parallel_tool_executor=tool_execution_handler, + if tool_config is None: + raise EventLoopException( + Exception("Model requested tool use but no tool config provided"), + kwargs["request_state"], ) - # Update state for the next cycle - kwargs = prepare_next_cycle(kwargs, event_loop_metrics) - - # Create the tool result message - tool_result_message: Message = { - "role": "user", - "content": [{"toolResult": result} for result in tool_results], - } - messages.append(tool_result_message) - callback_handler(message=tool_result_message) - - if cycle_span: - tracer.end_event_loop_cycle_span( - span=cycle_span, message=message, tool_result_message=tool_result_message - ) - - # Check if we should stop the event loop - if kwargs["request_state"].get("stop_event_loop"): - event_loop_metrics.end_cycle(cycle_start_time, cycle_trace) - return ( - stop_reason, - message, - event_loop_metrics, - kwargs["request_state"], - ) - - # Recursive call to continue the conversation - return recurse_event_loop( - model=model, - system_prompt=system_prompt, - messages=messages, - tool_config=tool_config, - callback_handler=callback_handler, - tool_handler=tool_handler, - **kwargs, - ) + # Handle tool execution + return _handle_tool_execution( + stop_reason, + message, + model, + system_prompt, + messages, + tool_config, + tool_handler, + callback_handler, + tool_execution_handler, + event_loop_metrics, + cycle_trace, + cycle_span, + cycle_start_time, + kwargs, + ) # End the cycle and return results event_loop_metrics.end_cycle(cycle_start_time, cycle_trace) @@ -377,3 +332,105 @@ def prepare_next_cycle(kwargs: Dict[str, Any], event_loop_metrics: EventLoopMetr kwargs["event_loop_parent_cycle_id"] = kwargs["event_loop_cycle_id"] return kwargs + + +def _handle_tool_execution( + stop_reason: StopReason, + message: Message, + model: Model, + system_prompt: Optional[str], + messages: Messages, + tool_config: ToolConfig, + tool_handler: ToolHandler, + callback_handler: Callable[..., Any], + tool_execution_handler: Optional[ParallelToolExecutorInterface], + event_loop_metrics: EventLoopMetrics, + cycle_trace: Trace, + cycle_span: Any, + cycle_start_time: float, + kwargs: Dict[str, Any], +) -> Tuple[StopReason, Message, EventLoopMetrics, Dict[str, Any]]: + tool_uses: List[ToolUse] = [] + tool_results: List[ToolResult] = [] + invalid_tool_use_ids: List[str] = [] + + """ + Handles the execution of tools requested by the model during an event loop cycle. + + Args: + stop_reason (StopReason): The reason the model stopped generating. + message (Message): The message from the model that may contain tool use requests. + model (Model): The model provider instance. + system_prompt (Optional[str]): The system prompt instructions for the model. + messages (Messages): The conversation history messages. + tool_config (ToolConfig): Configuration for available tools. + tool_handler (ToolHandler): Handler for tool execution. + callback_handler (Callable[..., Any]): Callback for processing events as they happen. + tool_execution_handler (Optional[ParallelToolExecutorInterface]): Optional handler for parallel tool execution. + event_loop_metrics (EventLoopMetrics): Metrics tracking object for the event loop. + cycle_trace (Trace): Trace object for the current event loop cycle. + cycle_span (Any): Span object for tracing the cycle (type may vary). + cycle_start_time (float): Start time of the current cycle. + kwargs (Dict[str, Any]): Additional keyword arguments, including request state. + + Returns: + Tuple[StopReason, Message, EventLoopMetrics, Dict[str, Any]]: + - The stop reason, + - The updated message, + - The updated event loop metrics, + - The updated request state. + """ + validate_and_prepare_tools(message, tool_uses, tool_results, invalid_tool_use_ids) + + if not tool_uses: + return stop_reason, message, event_loop_metrics, kwargs["request_state"] + + tool_handler_process = partial( + tool_handler.process, + messages=messages, + model=model, + system_prompt=system_prompt, + tool_config=tool_config, + callback_handler=callback_handler, + **kwargs, + ) + + run_tools( + handler=tool_handler_process, + tool_uses=tool_uses, + event_loop_metrics=event_loop_metrics, + request_state=cast(Any, kwargs["request_state"]), + invalid_tool_use_ids=invalid_tool_use_ids, + tool_results=tool_results, + cycle_trace=cycle_trace, + parent_span=cycle_span, + parallel_tool_executor=tool_execution_handler, + ) + + kwargs = prepare_next_cycle(kwargs, event_loop_metrics) + + tool_result_message: Message = { + "role": "user", + "content": [{"toolResult": result} for result in tool_results], + } + + messages.append(tool_result_message) + callback_handler(message=tool_result_message) + + if cycle_span: + tracer = get_tracer() + tracer.end_event_loop_cycle_span(span=cycle_span, message=message, tool_result_message=tool_result_message) + + if kwargs["request_state"].get("stop_event_loop", False): + event_loop_metrics.end_cycle(cycle_start_time, cycle_trace) + return stop_reason, message, event_loop_metrics, kwargs["request_state"] + + return recurse_event_loop( + model=model, + system_prompt=system_prompt, + messages=messages, + tool_config=tool_config, + callback_handler=callback_handler, + tool_handler=tool_handler, + **kwargs, + )