chore(wren-ai-service): add followup sql generation reasoning#1407
Conversation
|
Caution Review failedThe pull request is closed. WalkthroughThis pull request integrates a new follow-up SQL generation reasoning capability across the system. It adds a new model entry ( Changes
Possibly related PRs
Suggested reviewers
Poem
Tip ⚡🧪 Multi-step agentic review comment chat (experimental)
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (5)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (6)
wren-ai-service/src/web/v1/services/ask.py (2)
369-390: Validate branching logic for followup vs. standard reasoning.
Whenhistoriesis nonempty, the code switches to the “followup_sql_generation_reasoning” pipeline. Otherwise, it uses “sql_generation_reasoning.” This approach is clear and maintains backward compatibility for queries without history. Make sure to test scenarios with extremely long histories or partial histories to confirm stable performance and correct reasoning output.
676-678: Minor refactoring for feedback pipeline.
This code re-indentation is fine, and storing a “correcting” status in_ask_feedback_resultsis consistent with the existing pattern. If the re-indentation was unintentional, consider placing it in a dedicated formatting commit to keep functional changes separate from style changes.wren-ai-service/src/pipelines/generation/followup_sql_generation_reasoning.py (4)
19-35: Consider externalizing large system prompts.
This block defines a multi-line system prompt in the source code. To improve maintainability, consider extracting it into a dedicated file or configuration variable, which can make updates and translations easier.
37-67: Ensure no sensitive data is leaked in user prompt fields.
Building a large user prompt that includes database schemas, SQL samples, and user queries is powerful. Double-check logging configurations and potential debug statements to prevent unintentional exposure of sensitive content.
108-125: Initialize pipeline components with concurrency in mind.
Storing user queues inself._user_queuesis suitable for handling streaming. However, confirm that concurrent usage (two calls from the same query_id) is either invalid or properly synchronized if it occurs.
140-164: Consider longer or configurable timeout in get_streaming_results.
Terminating after 120 seconds is reasonable as a default, but if processing large contexts or slow models, additional time may be needed. Providing a configurable timeout would be beneficial.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
deployment/kustomizations/base/cm.yaml(1 hunks)docker/config.example.yaml(1 hunks)wren-ai-service/src/globals.py(1 hunks)wren-ai-service/src/pipelines/generation/__init__.py(2 hunks)wren-ai-service/src/pipelines/generation/followup_sql_generation_reasoning.py(1 hunks)wren-ai-service/src/web/v1/services/ask.py(3 hunks)wren-ai-service/tools/config/config.example.yaml(1 hunks)wren-ai-service/tools/config/config.full.yaml(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: pytest
- GitHub Check: pytest
- GitHub Check: Analyze (go)
🔇 Additional comments (13)
docker/config.example.yaml (1)
129-130: Implementation of new followup SQL generation reasoning pipeline looks good.The addition of the
followup_sql_generation_reasoningpipeline with the default LLM is consistent with other pipeline configurations and properly positioned after the relatedsql_generation_reasoningpipeline.wren-ai-service/tools/config/config.example.yaml (1)
143-144: Configuration consistency maintained across environments.The addition of the
followup_sql_generation_reasoningpipeline in this example config matches the implementation in the Docker configuration, ensuring consistent behavior across different environments.wren-ai-service/src/pipelines/generation/__init__.py (2)
5-5: New module import for FollowUpSQLGenerationReasoning looks good.The import statement follows the same pattern as other pipeline imports in this file.
38-38: Public API correctly updated to include the new pipeline.The new pipeline class has been appropriately added to the
__all__list, making it available to importers of this module.wren-ai-service/tools/config/config.full.yaml (1)
143-144: Full configuration properly updated with new pipeline.The addition of the
followup_sql_generation_reasoningpipeline in the full configuration is consistent with the other config files and properly positioned in the pipeline sequence.deployment/kustomizations/base/cm.yaml (1)
177-178: Confirm model alignment and configurations.
The addition of the “followup_sql_generation_reasoning” pipeline with the same LLM provider is consistent with the other pipelines. Verify that all references—particularly in production and test configuration files—point to this correct model name and that no environment variables or settings are inadvertently overridden.wren-ai-service/src/web/v1/services/ask.py (1)
753-755: Graceful handling for stopping feedback requests.
Marking the status as “stopped” in_ask_feedback_resultsis suitable. If multiple parallel requests might alter this state, confirm that you handle concurrency or repeated calls to avoid race conditions.wren-ai-service/src/pipelines/generation/followup_sql_generation_reasoning.py (6)
1-18: Imports and logging look good.
Imports and logger usage are straightforward and appropriate for async pipeline handling.
70-88: Prompt function flow is clear.
The code merges multiple data points into a single prompt. This is concise and logical. No immediate issues noticed.
90-93: Check generator’s timeout or error handling.
Currently, you await the generator call without additional timeout or fallback logic. If the downstream LLM takes too long or returns empty data, consider handling or at least logging that scenario.
130-139: Streaming callback approach looks correct.
Placing message chunks into a per-userasyncio.Queueis a robust pattern for real-time streaming. This is well-designed.
166-188: Run logic is straightforward.
The pipeline flow is clear. If any step fails withinexecute(), ensure the exception is captured upstream. Everything else is well structured.
190-200: Local testing block is beneficial.
Having adry_run_pipelinecall for quick tests or demos is commendable, and it won’t affect production.
c676edb to
90ef342
Compare
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (1)
wren-ai-service/src/pipelines/generation/followup_sql_generation_reasoning.py (1)
95-100:⚠️ Potential issueAdd defensive check for empty replies.
The current implementation could trigger an index error if "replies" is missing or empty. Add safeguards to avoid runtime errors.
@observe() def post_process( generate_sql_reasoning: dict, ) -> dict: - return generate_sql_reasoning.get("replies")[0] + replies = generate_sql_reasoning.get("replies", []) + if not replies: + return {} + return replies[0]
🧹 Nitpick comments (6)
wren-ai-service/src/pipelines/generation/followup_sql_generation_reasoning.py (6)
140-164: Implement proper error handling in the streaming results method.The current implementation has a basic timeout handling, but there's no detailed error logging or recovery strategy for other potential exceptions during streaming. Consider adding more comprehensive error handling.
async def get_streaming_results(self, query_id): async def _get_streaming_results(query_id): return await self._user_queues[query_id].get() if query_id not in self._user_queues: self._user_queues[ query_id ] = asyncio.Queue() # Ensure the user's queue exists while True: try: # Wait for an item from the user's queue self._streaming_results = await asyncio.wait_for( _get_streaming_results(query_id), timeout=120 ) if ( self._streaming_results == "<DONE>" ): # Check for end-of-stream signal del self._user_queues[query_id] break if self._streaming_results: # Check if there are results to yield yield self._streaming_results self._streaming_results = "" # Clear after yielding except TimeoutError: + logger.warning(f"Timeout while waiting for streaming results for query_id: {query_id}") + break + except Exception as e: + logger.error(f"Error in get_streaming_results for query_id {query_id}: {str(e)}") break
130-139: Ensure proper error handling in the streaming callback.The streaming callback doesn't have any error handling, which could potentially lead to unhandled exceptions and affect the application's stability.
def _streaming_callback(self, chunk, query_id): - if query_id not in self._user_queues: - self._user_queues[ - query_id - ] = asyncio.Queue() # Create a new queue for the user if it doesn't exist - # Put the chunk content into the user's queue - asyncio.create_task(self._user_queues[query_id].put(chunk.content)) - if chunk.meta.get("finish_reason"): - asyncio.create_task(self._user_queues[query_id].put("<DONE>")) + try: + if query_id not in self._user_queues: + self._user_queues[ + query_id + ] = asyncio.Queue() # Create a new queue for the user if it doesn't exist + # Put the chunk content into the user's queue + asyncio.create_task(self._user_queues[query_id].put(chunk.content)) + if chunk.meta.get("finish_reason"): + asyncio.create_task(self._user_queues[query_id].put("<DONE>")) + except Exception as e: + logger.error(f"Error in streaming callback for query_id {query_id}: {str(e)}")
166-187: Add docstring to the run method.The run method lacks a docstring that explains its purpose, parameters, and return values. Adding a comprehensive docstring would improve code maintainability.
@observe(name="FollowupSQL Generation Reasoning") async def run( self, query: str, contexts: List[str], histories: List[AskHistory], sql_samples: Optional[List[str]] = None, configuration: Configuration = Configuration(), query_id: Optional[str] = None, ): + """ + Runs the followup SQL generation reasoning pipeline. + + Args: + query (str): The user's query to process + contexts (List[str]): Database schema contexts + histories (List[AskHistory]): Previous questions and SQL queries + sql_samples (Optional[List[str]], optional): Example SQL queries. Defaults to None. + configuration (Configuration, optional): Configuration settings. Defaults to Configuration(). + query_id (Optional[str], optional): Unique identifier for the query. Defaults to None. + + Returns: + dict: The processed reasoning for SQL generation + """ logger.info("Followup SQL Generation Reasoning pipeline is running...") return await self._pipe.execute( ["post_process"], inputs={ "query": query, "documents": contexts, "histories": histories, "sql_samples": sql_samples or [], "configuration": configuration, "query_id": query_id, **self._components, }, )
71-87: Add type hints for the prompt function return value.The
promptfunction is missing proper type hinting for its return value. It returns a dict, but the specific structure of this dict isn't documented, which would be helpful for maintainers.@observe(capture_input=False) def prompt( query: str, documents: List[str], histories: List[AskHistory], sql_samples: List[str], prompt_builder: PromptBuilder, configuration: Configuration | None = Configuration(), -) -> dict: +) -> dict[str, Any]: """ + Creates a prompt for SQL reasoning generation. + + Returns: + dict[str, Any]: A dictionary containing the generated prompt + """ return prompt_builder.run( query=query, documents=documents, histories=histories, sql_samples=sql_samples, current_time=configuration.show_current_time(), language=configuration.language, )
190-199: Add error handling to the main block.The main block for testing should include proper error handling to catch and log exceptions during pipeline execution.
if __name__ == "__main__": from src.pipelines.common import dry_run_pipeline + import logging + + # Set up basic logging configuration for the test run + logging.basicConfig(level=logging.INFO) + + try: + dry_run_pipeline( + FollowUpSQLGenerationReasoning, + "followup_sql_generation_reasoning", + query="this is a test query", + histories=[], + contexts=[], + ) + except Exception as e: + logging.error(f"Error during dry run: {str(e)}") - dry_run_pipeline( - FollowUpSQLGenerationReasoning, - "followup_sql_generation_reasoning", - query="this is a test query", - histories=[], - contexts=[], - )
151-153: Consider making the timeout configurable.The timeout value of 120 seconds is hardcoded. It would be better to make this configurable, either through a class parameter or as part of the configuration object.
class FollowUpSQLGenerationReasoning(BasicPipeline): def __init__( self, llm_provider: LLMProvider, + streaming_timeout: int = 120, **kwargs, ): + self._streaming_timeout = streaming_timeout self._user_queues = {} # ...rest of the method async def get_streaming_results(self, query_id): # ...existing code # Wait for an item from the user's queue self._streaming_results = await asyncio.wait_for( - _get_streaming_results(query_id), timeout=120 + _get_streaming_results(query_id), timeout=self._streaming_timeout ) # ...rest of the method
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
deployment/kustomizations/base/cm.yaml(1 hunks)docker/config.example.yaml(1 hunks)wren-ai-service/src/globals.py(1 hunks)wren-ai-service/src/pipelines/generation/__init__.py(2 hunks)wren-ai-service/src/pipelines/generation/followup_sql_generation_reasoning.py(1 hunks)wren-ai-service/src/web/v1/services/ask.py(1 hunks)wren-ai-service/tools/config/config.example.yaml(1 hunks)wren-ai-service/tools/config/config.full.yaml(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (7)
- docker/config.example.yaml
- wren-ai-service/tools/config/config.full.yaml
- wren-ai-service/src/pipelines/generation/init.py
- wren-ai-service/src/globals.py
- wren-ai-service/tools/config/config.example.yaml
- deployment/kustomizations/base/cm.yaml
- wren-ai-service/src/web/v1/services/ask.py
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: pytest
- GitHub Check: Analyze (javascript-typescript)
- GitHub Check: Analyze (go)
paopa
left a comment
There was a problem hiding this comment.
Overall lgtm. Can you also add the pipeline setup into other example configs like Ollama, Groq, etc.?
Summary by CodeRabbit
followup_sql_generation_reasoningutilizing thelitellm_llm.defaultmodel in multiple configuration files.