feat: add streaming with tools support and resilience#94
feat: add streaming with tools support and resilience#94graniet merged 2 commits intograniet:mainfrom
Conversation
|
Thank you for your work ! @nazq |
There was a problem hiding this comment.
Pull request overview
This PR adds streaming support for LLM responses with tool calls, enabling real-time processing of both text and tool invocations. It implements the new chat_stream_with_tools() method across Anthropic and OpenAI-compatible providers (OpenAI, Mistral, Groq, Cohere, OpenRouter, HuggingFace), and extends resilience capabilities to streaming operations.
Key changes:
- Introduces
StreamChunkenum to represent different streaming events (text, tool use start/delta/complete, done) - Implements custom SSE parsers for Anthropic and OpenAI-compatible streaming formats that handle tool calls
- Adds retry logic with exponential backoff to
chat_stream_struct()andchat_stream_with_tools()inResilientLLM
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| src/chat/mod.rs | Defines StreamChunk enum and adds chat_stream_with_tools() trait method with comprehensive documentation |
| src/backends/anthropic.rs | Implements Anthropic-specific streaming with tools, including SSE parser and 18 unit tests covering various streaming scenarios |
| src/backends/openai.rs | Delegates streaming with tools to the generic OpenAICompatibleProvider implementation |
| src/providers/openai_compatible.rs | Implements generic OpenAI-compatible streaming with tools, SSE parser, and 12 unit tests covering OpenAI and vLLM formats |
| src/resilient_llm.rs | Adds retry logic with exponential backoff to both chat_stream_struct() and chat_stream_with_tools() methods |
| tests/test_backends.rs | Adds 4 integration tests covering streaming with tools, text-only responses, and resilient streaming for both tools and struct modes |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/backends/anthropic.rs
Outdated
| let anthropic_messages: Vec<AnthropicMessage> = messages | ||
| .iter() | ||
| .map(|m| AnthropicMessage { | ||
| role: match m.role { | ||
| ChatRole::User => "user", | ||
| ChatRole::Assistant => "assistant", | ||
| }, | ||
| content: match &m.message_type { | ||
| MessageType::Text => vec![MessageContent { | ||
| message_type: Some("text"), | ||
| text: Some(&m.content), | ||
| image_url: None, | ||
| source: None, | ||
| tool_use_id: None, | ||
| tool_input: None, | ||
| tool_name: None, | ||
| tool_result_id: None, | ||
| tool_output: None, | ||
| }], | ||
| MessageType::Pdf(raw_bytes) => { | ||
| vec![MessageContent { | ||
| message_type: Some("document"), | ||
| text: None, | ||
| image_url: None, | ||
| source: Some(ImageSource { | ||
| source_type: "base64", | ||
| media_type: "application/pdf", | ||
| data: BASE64.encode(raw_bytes), | ||
| }), | ||
| tool_use_id: None, | ||
| tool_input: None, | ||
| tool_name: None, | ||
| tool_result_id: None, | ||
| tool_output: None, | ||
| }] | ||
| } | ||
| MessageType::Image((image_mime, raw_bytes)) => { | ||
| vec![MessageContent { | ||
| message_type: Some("image"), | ||
| text: None, | ||
| image_url: None, | ||
| source: Some(ImageSource { | ||
| source_type: "base64", | ||
| media_type: image_mime.mime_type(), | ||
| data: BASE64.encode(raw_bytes), | ||
| }), | ||
| tool_use_id: None, | ||
| tool_input: None, | ||
| tool_name: None, | ||
| tool_result_id: None, | ||
| tool_output: None, | ||
| }] | ||
| } | ||
| MessageType::ImageURL(ref url) => vec![MessageContent { | ||
| message_type: Some("image_url"), | ||
| text: None, | ||
| image_url: Some(ImageUrlContent { url }), | ||
| source: None, | ||
| tool_use_id: None, | ||
| tool_input: None, | ||
| tool_name: None, | ||
| tool_result_id: None, | ||
| tool_output: None, | ||
| }], | ||
| MessageType::ToolUse(calls) => calls | ||
| .iter() | ||
| .map(|c| MessageContent { | ||
| message_type: Some("tool_use"), | ||
| text: None, | ||
| image_url: None, | ||
| source: None, | ||
| tool_use_id: Some(c.id.clone()), | ||
| tool_input: Some( | ||
| serde_json::from_str(&c.function.arguments) | ||
| .unwrap_or(c.function.arguments.clone().into()), | ||
| ), | ||
| tool_name: Some(c.function.name.clone()), | ||
| tool_result_id: None, | ||
| tool_output: None, | ||
| }) | ||
| .collect(), | ||
| MessageType::ToolResult(responses) => responses | ||
| .iter() | ||
| .map(|r| MessageContent { | ||
| message_type: Some("tool_result"), | ||
| text: None, | ||
| image_url: None, | ||
| source: None, | ||
| tool_use_id: None, | ||
| tool_input: None, | ||
| tool_name: None, | ||
| tool_result_id: Some(r.id.clone()), | ||
| tool_output: Some(r.function.arguments.clone()), | ||
| }) | ||
| .collect(), | ||
| }, | ||
| }) | ||
| .collect(); |
There was a problem hiding this comment.
The message preparation logic (converting ChatMessage to AnthropicMessage) is duplicated between chat_with_tools() (lines 380-477) and chat_stream_with_tools() (lines 709-806). This is approximately 100 lines of identical code that should be extracted into a private helper method to improve maintainability and reduce the risk of inconsistencies.
src/backends/anthropic.rs
Outdated
| let maybe_tool_slice: Option<&[Tool]> = tools.or(self.tools.as_deref()); | ||
| let anthropic_tools = maybe_tool_slice.map(|slice| { | ||
| slice | ||
| .iter() | ||
| .map(|tool| AnthropicTool { | ||
| name: &tool.function.name, | ||
| description: &tool.function.description, | ||
| schema: &tool.function.parameters, | ||
| }) | ||
| .collect::<Vec<_>>() | ||
| }); | ||
|
|
||
| let tool_choice = match self.tool_choice { | ||
| Some(ToolChoice::Auto) => { | ||
| Some(HashMap::from([("type".to_string(), "auto".to_string())])) | ||
| } | ||
| Some(ToolChoice::Any) => Some(HashMap::from([("type".to_string(), "any".to_string())])), | ||
| Some(ToolChoice::Tool(ref tool_name)) => Some(HashMap::from([ | ||
| ("type".to_string(), "tool".to_string()), | ||
| ("name".to_string(), tool_name.clone()), | ||
| ])), | ||
| Some(ToolChoice::None) => { | ||
| Some(HashMap::from([("type".to_string(), "none".to_string())])) | ||
| } | ||
| None => None, | ||
| }; | ||
|
|
||
| let final_tool_choice = if anthropic_tools.is_some() { | ||
| tool_choice.clone() | ||
| } else { | ||
| None | ||
| }; | ||
|
|
There was a problem hiding this comment.
The tool preparation and tool_choice mapping logic is duplicated between chat_with_tools() (starting around line 479) and chat_stream_with_tools() (lines 808-839). Extract this into a private helper method to avoid duplication and ensure consistency.
| let maybe_tool_slice: Option<&[Tool]> = tools.or(self.tools.as_deref()); | |
| let anthropic_tools = maybe_tool_slice.map(|slice| { | |
| slice | |
| .iter() | |
| .map(|tool| AnthropicTool { | |
| name: &tool.function.name, | |
| description: &tool.function.description, | |
| schema: &tool.function.parameters, | |
| }) | |
| .collect::<Vec<_>>() | |
| }); | |
| let tool_choice = match self.tool_choice { | |
| Some(ToolChoice::Auto) => { | |
| Some(HashMap::from([("type".to_string(), "auto".to_string())])) | |
| } | |
| Some(ToolChoice::Any) => Some(HashMap::from([("type".to_string(), "any".to_string())])), | |
| Some(ToolChoice::Tool(ref tool_name)) => Some(HashMap::from([ | |
| ("type".to_string(), "tool".to_string()), | |
| ("name".to_string(), tool_name.clone()), | |
| ])), | |
| Some(ToolChoice::None) => { | |
| Some(HashMap::from([("type".to_string(), "none".to_string())])) | |
| } | |
| None => None, | |
| }; | |
| let final_tool_choice = if anthropic_tools.is_some() { | |
| tool_choice.clone() | |
| } else { | |
| None | |
| }; | |
| let (anthropic_tools, final_tool_choice) = Self::prepare_anthropic_tools_and_choice( | |
| tools, | |
| self.tools.as_deref(), | |
| &self.tool_choice, | |
| ); |
There was a problem hiding this comment.
Fair review, want an update to refactor ?
Add chat_stream_with_tools method for streaming responses while handling tool calls. Implemented for Anthropic and OpenAI-compatible providers with full SSE parsing support. Also adds resilience (retry with exponential backoff) to all streaming methods: chat_stream_struct and chat_stream_with_tools. Includes integration tests for resilient streaming and vLLM-compatible SSE parsing tests.
Address PR review feedback by extracting duplicated code into reusable helper methods: - convert_messages_to_anthropic(): Converts ChatMessage slice to Anthropic message format, handling all message types (text, images, PDFs, tool use, tool results) - prepare_tools_and_choice(): Prepares Anthropic tools and tool_choice configuration from provided tools and instance settings This removes ~100 lines of duplicated code between chat_with_tools() and chat_stream_with_tools(), improving maintainability and reducing the risk of inconsistencies.
|
Addressed the copilot review above |
Summary
StreamChunkenum andchat_stream_with_tools()method for streaming responses while handling tool calls in real-timechat_stream_struct()andchat_stream_with_tools()inResilientLLMTest plan
test_anthropic_resilient_chat_stream_with_tools,test_anthropic_resilient_chat_stream_struct)cargo clippy --lib --features fullpassescargo fmtappliedAPI
Usage