feat(wren-ai-service): Add SQL Functions Support in Query Generation Pipeline (ai-env-changed)#1410
Conversation
WalkthroughThe pull request modifies the Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant AskService
participant SqlFunctionsPipeline
participant WrenEngine
User->>AskService: Submit AskRequest (with optional data_source)
AskService->>SqlFunctionsPipeline: Retrieve SQL functions concurrently
SqlFunctionsPipeline->>WrenEngine: Call get_func_list(data_source)
WrenEngine-->>SqlFunctionsPipeline: Return SQL functions list
SqlFunctionsPipeline-->>AskService: Provide SQL functions
AskService->>AskService: Process and generate SQL query
AskService-->>User: Return response
sequenceDiagram
participant Pipeline
participant Cache
participant RetrievalModule
Pipeline->>Cache: Check for SQL functions
alt Cache hit
Cache-->>Pipeline: Return cached SQL functions
else Cache miss
Pipeline->>RetrievalModule: Execute SQL function retrieval (async)
RetrievalModule-->>Pipeline: Return new SQL functions list
Pipeline->>Cache: Update cache with new data
end
Pipeline-->>Pipeline: Continue processing SQL functions
Possibly related PRs
Poem
Tip ⚡🧪 Multi-step agentic review comment chat (experimental)
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (6)
wren-ai-service/src/pipelines/retrieval/sql_functions.py (5)
11-11: Remove unused import [pydantic.BaseModel].
Since this import is never referenced, removing it will keep dependencies streamlined.-from pydantic import BaseModel🧰 Tools
🪛 Ruff (0.8.2)
11-11:
pydantic.BaseModelimported but unusedRemove unused import:
pydantic.BaseModel(F401)
56-67: Consider adding error handling for failed HTTP requests.
Should the external call fail, it may be beneficial to gracefully handle exceptions (e.g., log the error, retry, or return an empty list) rather than letting the exception propagate.
91-97: Remove or utilize unused**kwargsconstructor parameter.
Currently,**kwargsis not used and may create confusion. If not needed, removing it simplifies parameter usage.def __init__( self, engine: Engine, engine_timeout: Optional[float] = 30.0, ttl: Optional[int] = 60 * 60 * 24, - **kwargs, ):
114-131:project_idparameter is never used inrun()method.
Consider removing or including logic to handleproject_idif it is indeed relevant to SQL function retrieval.
98-98: Verify concurrency safety ofTTLCache.
TTLCachemay not be fully thread-safe under heavy concurrency. If this pipeline could be invoked from multiple tasks concurrently, consider adding a lock or validating usage in a single-thread event loop.wren-ai-service/tests/pytest/pipelines/retrieval/sql_function.py (1)
43-55: Consider adding negative or error condition tests.
Adding tests for cases like network failures or invalid engine responses would add robustness and better reflect real-world scenarios.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
wren-ai-service/Justfile(1 hunks)wren-ai-service/src/globals.py(1 hunks)wren-ai-service/src/pipelines/generation/followup_sql_generation.py(6 hunks)wren-ai-service/src/pipelines/generation/sql_generation.py(6 hunks)wren-ai-service/src/pipelines/indexing/db_schema.py(2 hunks)wren-ai-service/src/pipelines/indexing/utils/helper.py(5 hunks)wren-ai-service/src/pipelines/retrieval/__init__.py(2 hunks)wren-ai-service/src/pipelines/retrieval/sql_functions.py(1 hunks)wren-ai-service/src/providers/engine/wren.py(1 hunks)wren-ai-service/src/web/v1/services/ask.py(7 hunks)wren-ai-service/tests/pytest/pipelines/indexing/test_helper.py(7 hunks)wren-ai-service/tests/pytest/pipelines/retrieval/sql_function.py(1 hunks)wren-ai-service/tools/config/config.example.yaml(1 hunks)
🧰 Additional context used
🪛 Ruff (0.8.2)
wren-ai-service/src/pipelines/retrieval/sql_functions.py
11-11: pydantic.BaseModel imported but unused
Remove unused import: pydantic.BaseModel
(F401)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Analyze (go)
🔇 Additional comments (34)
wren-ai-service/src/pipelines/indexing/utils/helper.py (3)
55-55: Good fix: Corrected typo in variable nameThe renaming from
COLUMN_PROPRECESSORStoCOLUMN_PREPROCESSORSimproves code clarity and consistency.
94-94: LGTM: Documentation updated to match variable nameDocumentation was properly updated to reflect the corrected variable name.
146-147: LGTM: Variable reference updated in load_helpers functionThe function now correctly references the renamed variable, ensuring consistency throughout the codebase.
wren-ai-service/tests/pytest/pipelines/indexing/test_helper.py (3)
5-5: LGTM: Updated import to use corrected variable nameImport statement correctly updated to use
COLUMN_PREPROCESSORS.
26-26: LGTM: Test functions updated to use corrected variable nameAll test functions now correctly reference the renamed
COLUMN_PREPROCESSORSvariable.Also applies to: 41-41, 53-53, 65-65
119-119: LGTM: Test assertions updated to use corrected variable nameThe mock object and assertions in
test_load_helpersnow correctly use the renamed variable.Also applies to: 130-132
wren-ai-service/src/pipelines/indexing/db_schema.py (2)
69-69: LGTM: Corrected variable referenceVariable reference updated to use the corrected name
COLUMN_PREPROCESSORS.
89-89:Details
❓ Verification inconclusive
Good enhancement: Added filtering for hidden columns
The new condition filters out columns where
isHiddenisTrue, which aligns with the PR objective to improve schema processing by filtering hidden columns. This helps keep the generated schema cleaner by excluding columns that shouldn't be visible.Let's verify that this doesn't cause any issues with the rest of the pipeline:
🏁 Script executed:
#!/bin/bash # Check for any references to isHidden in tests to ensure compatibility rg "isHidden" tests/ -A 5 -B 5Length of output: 77
Action: Please verify test coverage for the new
isHiddenfiltering logic.The change in
wren-ai-service/src/pipelines/indexing/db_schema.py(line 89) that checksif column.get("isHidden") is not Truesuccessfully filters out columns marked as hidden, which meets the PR's objective of cleaning up the generated schema. However, our initial attempt to search for test references to
"isHidden"failed because thetests/directory could not be found (error:tests/: No such file or directory (os error 2)).Please ensure that one of the following is addressed:
- Confirm that test references for
isHiddenexist in another directory (e.g.,test/or an alternative location), and update the automated verification accordingly.- Manually verify that the change does not negatively affect pipeline functionality, particularly in areas where tests would normally cover schema validations.
wren-ai-service/src/pipelines/retrieval/__init__.py (2)
6-6: LGTM: Added SqlFunctions importThe import statement is correctly added to make the SqlFunctions class available through the retrieval module.
16-16: LGTM: Added SqlFunctions to module exportsSqlFunctions is correctly added to the
__all__list, making it accessible when importing from the retrieval package.wren-ai-service/Justfile (1)
29-30: Command simplification looks goodThe
starttarget has been simplified by removing the dependency onuse-wren-ui-as-engine. This streamlines the start process by directly executing the main module without requiring the config update step.wren-ai-service/tools/config/config.example.yaml (1)
156-157: LGTM: Added SQL functions retrieval pipelineThe new pipeline entry for SQL functions retrieval is correctly configured to use the wren_ibis engine. This addition aligns well with the PR's objective to support SQL functions in the query generation pipeline.
wren-ai-service/src/globals.py (1)
130-133: LGTM: Added SQL functions retrieval to service containerThe SqlFunctions component is properly integrated into the service container with appropriate configuration parameters. This addition follows the established pattern for other pipeline components and correctly sets up the engine timeout.
wren-ai-service/src/providers/engine/wren.py (1)
153-173: Well-implemented SQL function retrieval with proper error handlingThe
get_func_listmethod provides a clean implementation for retrieving SQL functions from the engine with appropriate error handling. The method handles potential issues like timeouts and server errors, gracefully falling back to an empty list when errors occur.Consider adding a test case that verifies this method's behavior with different response scenarios (success, error, timeout).
wren-ai-service/src/web/v1/services/ask.py (6)
33-33: Good addition of data source configurationAdding an optional data source parameter with a sensible default value enhances flexibility while maintaining backward compatibility.
351-363: Proper integration of SQL functions retrievalThe SQL functions retrieval has been integrated correctly as a concurrent task alongside the existing retrievals. The implementation uses
asyncio.gatherappropriately to fetch all required data in parallel.
446-446: Correctly passing SQL functions to followup SQL generationSQL functions are properly passed to the follow-up SQL generation pipeline to enhance query context.
461-461: Correctly passing SQL functions to SQL generationSQL functions are properly passed to the standard SQL generation pipeline to enhance query context.
692-697: Code formatting improved for better readabilityThe parenthesis formatting change enhances code readability while maintaining the same functionality.
769-773: Code formatting improved for better readabilitySimilar formatting improvement as before, which keeps the style consistent throughout the file.
wren-ai-service/src/pipelines/generation/sql_generation.py (6)
19-19: Good import for the SqlFunction typeProperly importing the SqlFunction type from its module for type safety.
36-41: Well-structured template for SQL functionsThe template section for SQL functions follows the same pattern as other conditional blocks, which maintains consistency in the template structure. The conditional check ensures the section is only included when functions are available.
79-79: Type-safe parameter additionThe SQL functions parameter is properly type-hinted as
List[SqlFunction] | Nonewhich ensures type safety while allowing the parameter to be optional.
94-94: Successfully passing SQL functions to prompt builderSQL functions are properly passed to the prompt builder to be incorporated into the final prompt.
162-162: Consistent parameter in run methodThe run method correctly includes the SQL functions parameter with the same type hint as in the prompt function, maintaining consistency.
177-177: Successfully passing SQL functions to pipeline executionSQL functions are correctly passed as part of the inputs dictionary to the pipeline execution.
wren-ai-service/src/pipelines/generation/followup_sql_generation.py (6)
19-19: Good import for the SqlFunction typeProperly importing the SqlFunction type from its module for type safety.
41-46: Well-structured template for SQL functionsThe template section for SQL functions follows the same pattern as other conditional blocks, maintaining consistency in the template structure. The conditional check ensures the section is only included when functions are available.
89-89: Type-safe parameter additionThe SQL functions parameter is properly type-hinted as
List[SqlFunction] | Nonewhich ensures type safety while allowing the parameter to be optional.
108-108: Successfully passing SQL functions to prompt builderSQL functions are properly passed to the prompt builder to be incorporated into the final prompt.
174-174: Consistent parameter in run methodThe run method correctly includes the SQL functions parameter with the same type hint as in the prompt function, maintaining consistency.
190-190: Successfully passing SQL functions to pipeline executionSQL functions are correctly passed as part of the inputs dictionary to the pipeline execution.
wren-ai-service/tests/pytest/pipelines/retrieval/sql_function.py (2)
5-5: Test structure is well-organized.
The tests properly align with the pipeline’s new functionality, ensuring coverage of core logic.
65-72: Excellent coverage of cache behavior with case-insensitive data sources.
This ensures that the pipeline logic doesn't redundantly fetch data for data sources that differ only in case.
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (3)
wren-ai-service/docs/config_examples/config.ollama.yaml (1)
132-133: New SQL Functions Retrieval Pipeline Entry AddedThe new pipeline entry
sql_functions_retrievalwith the engine set towren_ibisis correctly added to the pipelines list. The change is consistent with the PR objective to support SQL functions retrieval via the WrenIbis engine. Please verify that the engine identifierwren_ibisis defined and integrated properly in the codebase.wren-ai-service/docs/config_examples/config.google_ai_studio.yaml (1)
142-143: SQL Functions Retrieval Pipe AddedThe addition of the
sql_functions_retrievalpipe with thewren_ibisengine is consistent with the PR objectives. Please ensure that any additional engine-specific configurations (e.g., connection info, TTL settings) required bywren_ibisare documented and set up correctly elsewhere in your configuration.wren-ai-service/tools/config/config.full.yaml (1)
153-154: New SQL Functions Retrieval Pipeline EntryThe new entry for the
sql_functions_retrievalpipe using thewren_ibisengine has been correctly added. Verify that its position within the pipeline list reflects the intended processing order and that any associated parameters (such as caching behavior) are appropriately configured in the service container.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
deployment/kustomizations/base/cm.yaml(1 hunks)docker/config.example.yaml(1 hunks)wren-ai-service/docs/config_examples/config.azure.yaml(1 hunks)wren-ai-service/docs/config_examples/config.deepseek.yaml(1 hunks)wren-ai-service/docs/config_examples/config.google_ai_studio.yaml(1 hunks)wren-ai-service/docs/config_examples/config.groq.yaml(1 hunks)wren-ai-service/docs/config_examples/config.ollama.yaml(1 hunks)wren-ai-service/tools/config/config.full.yaml(1 hunks)
🔇 Additional comments (5)
docker/config.example.yaml (1)
133-134: Addition of SQL Functions Retrieval Pipeline in Docker ConfigThe new pipeline entry for
sql_functions_retrievalusing thewren_ibisengine is integrated in the Docker configuration file. The YAML indentation and structure appear correct. Ensure that the service container and related components use this pipeline entry consistently.wren-ai-service/docs/config_examples/config.groq.yaml (1)
135-136: Integrating SQL Functions Retrieval into GROQ ConfigThe addition of the
sql_functions_retrievalpipeline entry with enginewren_ibisin this configuration file follows the same structure as in the other examples. This consistent update will help maintain parity across different deployment options.wren-ai-service/docs/config_examples/config.azure.yaml (1)
143-144: New SQL Functions Retrieval Entry in Azure ConfigThe new pipeline entry
sql_functions_retrievalwithwren_ibisas the engine is properly added to this configuration. The YAML structure is consistent with other config examples. Double-check that all Azure-specific settings remain in place as required.deployment/kustomizations/base/cm.yaml (1)
188-189: ConfigMap Update: SQL Functions Retrieval PipelineThe ConfigMap for the AI service now includes the new pipeline entry
sql_functions_retrievalwith the enginewren_ibis. This update aligns with the SQL functions support enhancement and appears well-integrated. Please ensure that downstream services referencing this configuration are updated to leverage the new functionality.wren-ai-service/docs/config_examples/config.deepseek.yaml (1)
153-154: Enhancement: Inclusion of SQL Functions Retrieval PipeThe new
sql_functions_retrievalpipe utilizing thewren_ibisengine is properly incorporated in this Deepseek configuration file. This ensures consistency across your configuration examples. Please double-check that any Deepseek-specific settings do not conflict with this new addition.
cyyeh
left a comment
There was a problem hiding this comment.
overall lgtm
one question, do we need to add wren-ibis to config.yaml?
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (4)
wren-ai-service/src/pipelines/retrieval/sql_functions.py (4)
19-56: Well-structured SqlFunction class with clean implementationThe SqlFunction class effectively builds string representations of SQL functions with proper handling of different parameter types and return types. The implementation is clean and follows good practices.
A few suggestions to enhance robustness:
class SqlFunction: - _expr: str = None + _expr: str def __init__(self, definition: dict): + if not isinstance(definition, dict): + raise TypeError("definition must be a dictionary") + if "name" not in definition: + raise ValueError("definition must contain a 'name' key")
59-73: API retrieval function looks good but could benefit from error handlingThe get_functions implementation correctly uses aiohttp for async requests to retrieve SQL functions from the engine.
Consider adding error handling to manage potential API failures:
async def get_functions( engine: WrenIbis, data_source: str, engine_timeout: float = 30.0, ) -> Dict[str, Any]: async with aiohttp.ClientSession() as session: - func_list = await engine.get_func_list( - session=session, - data_source=data_source, - timeout=engine_timeout, - ) - return {"func_list": func_list} + try: + func_list = await engine.get_func_list( + session=session, + data_source=data_source, + timeout=engine_timeout, + ) + return {"func_list": func_list} + except Exception as e: + logger.error(f"Error retrieving SQL functions for {data_source}: {e}") + return {"func_list": []}
95-116: SqlFunctions class initialization is well-structuredThe class properly extends BasicPipeline and initializes with appropriate defaults. The 24-hour TTL is a reasonable default for SQL function definitions that don't change frequently.
Consider adding a docstring to explain the purpose of this class and its parameters.
class SqlFunctions(BasicPipeline): + """ + Pipeline component for retrieving and caching SQL functions from a data source. + + Args: + engine: The engine instance used to retrieve SQL functions + engine_timeout: Timeout in seconds for engine requests (default: 30.0) + ttl: Time-to-live in seconds for cached SQL functions (default: 24 hours) + """ def __init__( self, engine: Engine,
117-141: Solid implementation of run method with good caching strategyThe run method properly checks the cache before executing the pipeline, which will improve performance. Good use of logging to track cache hits.
Consider adding a debug log when function retrieval is initiated:
async def run( self, data_source: str, project_id: Optional[str] = None, ) -> List[SqlFunction]: logger.info( f"Project ID: {project_id} SQL Functions Retrieval pipeline is running..." ) _data_source = data_source.lower() if _data_source in self._cache: logger.info(f"Hit cache of SQL Functions for {_data_source}") return self._cache[_data_source] + else: + logger.debug(f"Cache miss for SQL Functions of {_data_source}, fetching from source")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
deployment/kustomizations/base/cm.yaml(2 hunks)docker/config.example.yaml(2 hunks)wren-ai-service/docs/config_examples/config.azure.yaml(2 hunks)wren-ai-service/docs/config_examples/config.deepseek.yaml(2 hunks)wren-ai-service/docs/config_examples/config.google_ai_studio.yaml(4 hunks)wren-ai-service/docs/config_examples/config.groq.yaml(4 hunks)wren-ai-service/docs/config_examples/config.ollama.yaml(3 hunks)wren-ai-service/src/pipelines/retrieval/sql_functions.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- wren-ai-service/docs/config_examples/config.ollama.yaml
- docker/config.example.yaml
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: pytest
- GitHub Check: Analyze (python)
- GitHub Check: Analyze (javascript-typescript)
- GitHub Check: Analyze (go)
🔇 Additional comments (14)
wren-ai-service/docs/config_examples/config.azure.yaml (2)
38-42: Engine Addition: Newwren_ibisEngine ConfigurationThe new engine entry for
wren_ibiswith the endpointhttp://wren-ibis:8000is correctly added. Please verify that this endpoint is consistent with your service deployment (e.g. DNS/service discovery in your cloud environment) to avoid connectivity issues.
148-149: Pipeline Addition: SQL Functions Retrieval PipeThe new pipeline, named
sql_functions_retrieval, correctly references thewren_ibisengine. Ensure that this pipeline integrates well with the SQL functions retrieval and that any caching policies or TTL settings (if applicable) are properly propagated downstream.wren-ai-service/docs/config_examples/config.deepseek.yaml (2)
53-57: Engine Addition: Includingwren_ibisfor DeepSeekA new engine entry for
wren_ibiswith endpointhttp://wren-ibis:8000is introduced. Confirm that this endpoint is reachable and that its configuration matches the expected settings for your DeepSeek environment.
147-148: Pipeline Addition: SQL Functions RetrievalThe addition of the
sql_functions_retrievalpipeline is properly configured to use thewren_ibisengine. Please double-check that the downstream processing of SQL function contexts aligns with the updated SQL generation logic.deployment/kustomizations/base/cm.yaml (3)
44-44: Environmental Variable Update: IBIS_SERVER_ENDPOINTThe
IBIS_SERVER_ENDPOINTis now set tohttp://wren-ibis-server-svc:8000. Verify that this value reflects the correct internal service name and port as deployed in your Kubernetes cluster.
93-97: Engine Configuration in ConfigMapA new engine entry for
wren_ibisis appended with the endpointhttp://wren-ibis-server-svc:8000. Please ensure that this configuration is consistent with your service discovery and that downstream components referencing this engine can resolve it correctly.
193-194: Pipeline Addition: Incorporation of SQL Functions RetrievalThe pipeline
sql_functions_retrievalnow references the newwren_ibisengine. It is advisable to perform end-to-end tests to verify that SQL function definitions are correctly cached and retrieved as intended.wren-ai-service/docs/config_examples/config.google_ai_studio.yaml (2)
42-46: Engine Addition: Integration ofwren_ibisThe new engine
wren_ibiswith the endpointhttp://wren-ibis:8000has been added. Ensure that this configuration aligns with your Google AI Studio deployment requirements and network setup.
147-148: Pipeline Addition: SQL Functions RetrievalThe SQL functions retrieval pipe is added and set to use the
wren_ibisengine. Confirm that this pipe is triggered appropriately during SQL generation, and the SQL functions context is correctly appended to your prompts.wren-ai-service/docs/config_examples/config.groq.yaml (2)
35-39: Engine Addition:wren_ibisfor GROQA new engine entry for
wren_ibisis introduced with the endpointhttp://wren-ibis:8000. Please verify that this endpoint works within your GROQ environment and does not conflict with any legacy configurations.
140-141: Pipeline Update: SQL Functions Retrieval IntegrationThe pipeline for
sql_functions_retrievalis now included and assigned to thewren_ibisengine. Make sure that this integration is validated against the SQL generation modules to ensure that the added SQL functions context is effective.wren-ai-service/src/pipelines/retrieval/sql_functions.py (3)
75-80: Simple and effective function for converting to SqlFunction objectsThis function is concise and clear. No issues found.
82-90: Cache implementation is clean and effectiveThe caching mechanism is straightforward and fits the purpose well. Good use of TTLCache.
142-151: Well-designed dry-run capabilityThe main block provides a convenient way to test the pipeline in isolation, which is good for development and debugging.
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (4)
wren-ai-service/tests/pytest/pipelines/retrieval/sql_function.py (4)
74-78: Consider adding a more descriptive comment for edge case handlingThis test covers an important edge case with
Noneparameters, but a comment explaining the expected behavior would improve maintainability.def test_sql_function_param_type_none(): + """Test that None param_types are handled gracefully by defaulting to 'any'.""" func = SqlFunction( {"name": "test_func", "param_types": None, "return_type": "text"} ) assert str(func) == "test_func(any) -> text"
81-85: Consider adding a descriptive comment for this edge case as wellSimilar to the previous test, a comment would improve understanding of why this test is important.
def test_sql_function_return_type_none(): + """Test that None return_type is handled gracefully by defaulting to 'any'.""" func = SqlFunction( {"name": "test_func", "param_types": "int,text", "return_type": None} ) assert str(func) == "test_func($0: int, $1: text) -> any"
88-96: Add test for TTL caching expirationThe tests cover basic caching functionality but don't test TTL expiration, which was mentioned in the PR description as a feature of the caching system.
@pytest.mark.asyncio async def test_sql_functions_pipeline_cache_expiration(sql_functions_pipeline, monkeypatch): """Test that cached SQL functions expire after TTL.""" # Mock time.time to control time progression current_time = 0 def mock_time(): nonlocal current_time return current_time monkeypatch.setattr("time.time", mock_time) # First call should fetch from engine result1 = await sql_functions_pipeline.run("postgres") # Advance time past TTL current_time += sql_functions_pipeline.cache_ttl + 1 # Second call should fetch from engine again due to TTL expiration result2 = await sql_functions_pipeline.run("postgres") # Verify engine was called twice assert sql_functions_pipeline._components["engine"].get_func_list.call_count == 2 assert result1 == result2
1-97: Consider adding tests for additional edge casesThe test suite is solid but could be enhanced by adding tests for empty function lists, error handling from the engine, and functions with a large number of parameters.
Here are a few additional test cases to consider:
@pytest.mark.asyncio async def test_sql_functions_pipeline_empty_list(sql_functions_pipeline, monkeypatch): """Test behavior when no SQL functions are available.""" # Override the mock to return an empty list sql_functions_pipeline._components["engine"].get_func_list = AsyncMock(return_value=[]) result = await sql_functions_pipeline.run("postgres") assert len(result) == 0 @pytest.mark.asyncio async def test_sql_functions_pipeline_engine_error(sql_functions_pipeline, monkeypatch): """Test error handling when engine fails to retrieve functions.""" # Override the mock to raise an exception sql_functions_pipeline._components["engine"].get_func_list = AsyncMock( side_effect=Exception("Engine error") ) # Verify exception is properly handled or propagated with pytest.raises(Exception, match="Engine error"): await sql_functions_pipeline.run("postgres")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
wren-ai-service/tests/pytest/pipelines/retrieval/sql_function.py(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: pytest
- GitHub Check: Analyze (javascript-typescript)
- GitHub Check: Analyze (go)
🔇 Additional comments (7)
wren-ai-service/tests/pytest/pipelines/retrieval/sql_function.py (7)
5-5: Good job importing directly from the module under testThis shows a clear understanding of the module structure and makes the test more readable.
7-16: Well-structured mock data for comprehensive testingThe test mock data covers different parameter combinations and return types which is important for thorough testing of SQL function handling.
19-28: Well-organized fixtures using appropriate mocking techniquesGood use of
AsyncMockfor the asyncget_func_listmethod and proper fixture dependency setup withsql_functions_pipelinedepending onmock_engine.
31-35: Good basic initialization testThis test verifies both string representation (
__str__) and object representation (__repr__), ensuring consistent formatting.
43-54: Comprehensive pipeline execution test with caching verificationGood test coverage that confirms both the correct function retrieval and the caching mechanism by verifying the mock is only called once.
57-62: Good test for data source differentiationThis test properly verifies that different data sources trigger separate function list retrievals.
65-71: Excellent case sensitivity handling testThe case insensitivity test is important for usability, ensuring users don't encounter issues with inconsistent capitalization.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (6)
wren-ai-service/src/pipelines/retrieval/sql_functions.py (6)
19-21: Type hint the class variable for clarity.The class variable
_exprshould have a proper type hint to make the code more maintainable and readable.- class SqlFunction: - _expr: str = None + class SqlFunction: + _expr: Optional[str] = None
23-35: Enhance type annotations for better code clarity.The _extract method could benefit from more specific type annotations. Using TypedDict might be more appropriate for the definition parameter to document expected keys.
- def __init__(self, definition: dict): - def _extract() -> tuple[str, list, str]: + def __init__(self, definition: Dict[str, Any]): + def _extract() -> tuple[str, List[str], Union[str, List[str]]]:Also, consider adding docstrings to describe the expected format of the definition dictionary.
31-33: Improve clarity of return type handling.The condition for updating the return type when it's "same as input" or "same as arg types" is not immediately clear. Consider adding a comment or refactoring to make the intent more obvious.
if return_type in ["same as input", "same as arg types"]: + # Use parameter types as the return type when it's defined as "same as input" or "same as arg types" return_type = param_types
95-116: The initialization of SqlFunctions could be improved.The TTLCache is initialized with hardcoded maxsize, which could be made configurable via a parameter. Also consider adding a docstring to explain the purpose and usage of this class.
class SqlFunctions(BasicPipeline): + """ + Pipeline for retrieving and caching SQL functions from a data source. + + This pipeline retrieves SQL functions from a specified data source using + the provided engine, processes them into SqlFunction objects, and caches + them with a time-to-live (TTL) for efficient retrieval. + """ def __init__( self, engine: Engine, engine_timeout: Optional[float] = 30.0, ttl: Optional[int] = 60 * 60 * 24, + cache_maxsize: Optional[int] = 100, **kwargs, ) -> None: - self._cache = TTLCache(maxsize=100, ttl=ttl) + self._cache = TTLCache(maxsize=cache_maxsize, ttl=ttl)
1-16: Consider organizing imports according to conventions.The imports are mostly well-organized, but it's common practice to group imports by stdlib, third-party, and local application imports with a blank line between groups.
import logging import sys from typing import Any, Dict, List, Optional import aiohttp from cachetools import TTLCache from hamilton import base from hamilton.async_driver import AsyncDriver from hamilton.function_modifiers import extract_fields from langfuse.decorators import observe from src.core.engine import Engine from src.core.pipeline import BasicPipeline from src.providers.engine.wren import WrenIbis logger = logging.getLogger("wren-ai-service")
36-43: Consider adding validation for parameter types.The
_param_exprfunction could benefit from validation to ensure parameter types are properly formatted, especially before stripping.def _param_expr(param_type: str, index: int) -> str: if param_type == "any": return "any" + if not param_type: + return "any" # Default to "any" for empty parameter types + param_type = param_type.strip() param_name = f"${index}" return f"{param_name}: {param_type}"
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
wren-ai-service/src/pipelines/retrieval/sql_functions.py(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: pytest
🔇 Additional comments (4)
wren-ai-service/src/pipelines/retrieval/sql_functions.py (4)
143-151: LGTM: The main block for testing seems well implemented.The main block provides a convenient way to test the pipeline. This is a good practice for local development and testing.
75-80: LGTM: The sql_functions function is well-implemented.This function efficiently creates SqlFunction instances from the provided function list. The implementation is concise and appropriate.
82-90: LGTM: The cache implementation is clean and effective.The function properly stores SQL functions in the TTL cache for the specified data source. The implementation is straightforward and serves its purpose well.
44-50: LGTM: The parsing and formatting logic is clearly implemented.The function extraction, parameter processing, and final expression formation are well structured and easy to follow.
fbe1bca to
e486ea3
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (2)
wren-ai-service/src/pipelines/retrieval/sql_functions.py (1)
21-58: Potential edge case with array-type return types.When
return_typeis set to the value ofparam_types(lines 33-35), it may become a list instead of a string. This will produce a string like['int','int']in__str__and__repr__. If that is unintended or overly verbose, consider converting it to a single representative type or formatting it into a comma-separated string.wren-ai-service/src/pipelines/indexing/project_meta.py (1)
62-99: Consider error handling for pipeline execution.The
runmethod executes the pipeline but does not catch potential exceptions fromvalidator,cleaner, orwriter. A try/except can provide better resilience and user feedback if something goes wrong.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (25)
deployment/kustomizations/base/cm.yaml(3 hunks)docker/config.example.yaml(2 hunks)wren-ai-service/Justfile(1 hunks)wren-ai-service/docs/config_examples/config.azure.yaml(4 hunks)wren-ai-service/docs/config_examples/config.deepseek.yaml(4 hunks)wren-ai-service/docs/config_examples/config.google_ai_studio.yaml(4 hunks)wren-ai-service/docs/config_examples/config.groq.yaml(4 hunks)wren-ai-service/docs/config_examples/config.ollama.yaml(3 hunks)wren-ai-service/src/globals.py(2 hunks)wren-ai-service/src/pipelines/generation/followup_sql_generation.py(6 hunks)wren-ai-service/src/pipelines/generation/sql_generation.py(6 hunks)wren-ai-service/src/pipelines/generation/utils/sql.py(0 hunks)wren-ai-service/src/pipelines/indexing/__init__.py(2 hunks)wren-ai-service/src/pipelines/indexing/db_schema.py(2 hunks)wren-ai-service/src/pipelines/indexing/project_meta.py(1 hunks)wren-ai-service/src/pipelines/indexing/utils/helper.py(5 hunks)wren-ai-service/src/pipelines/retrieval/__init__.py(2 hunks)wren-ai-service/src/pipelines/retrieval/sql_functions.py(1 hunks)wren-ai-service/src/providers/engine/wren.py(1 hunks)wren-ai-service/src/web/v1/services/ask.py(6 hunks)wren-ai-service/src/web/v1/services/semantics_preparation.py(2 hunks)wren-ai-service/tests/pytest/pipelines/indexing/test_helper.py(7 hunks)wren-ai-service/tests/pytest/pipelines/retrieval/sql_function.py(1 hunks)wren-ai-service/tools/config/config.example.yaml(1 hunks)wren-ai-service/tools/config/config.full.yaml(2 hunks)
💤 Files with no reviewable changes (1)
- wren-ai-service/src/pipelines/generation/utils/sql.py
🚧 Files skipped from review as they are similar to previous changes (10)
- docker/config.example.yaml
- wren-ai-service/Justfile
- wren-ai-service/src/pipelines/indexing/db_schema.py
- wren-ai-service/docs/config_examples/config.ollama.yaml
- wren-ai-service/src/pipelines/retrieval/init.py
- wren-ai-service/src/pipelines/indexing/utils/helper.py
- wren-ai-service/tests/pytest/pipelines/indexing/test_helper.py
- wren-ai-service/docs/config_examples/config.deepseek.yaml
- wren-ai-service/docs/config_examples/config.groq.yaml
- wren-ai-service/src/pipelines/generation/followup_sql_generation.py
🧰 Additional context used
🧬 Code Definitions (3)
wren-ai-service/src/pipelines/indexing/__init__.py (1)
wren-ai-service/src/pipelines/indexing/project_meta.py (1) (1)
ProjectMeta(62:102)
wren-ai-service/src/pipelines/retrieval/sql_functions.py (1)
wren-ai-service/src/providers/engine/wren.py (2) (2)
WrenIbis(86:173)get_func_list(153:173)
wren-ai-service/src/globals.py (2)
wren-ai-service/src/pipelines/indexing/project_meta.py (1) (1)
ProjectMeta(62:102)wren-ai-service/src/pipelines/retrieval/sql_functions.py (1) (1)
SqlFunctions(97:163)
🪛 Ruff (0.8.2)
wren-ai-service/src/pipelines/retrieval/sql_functions.py
11-11: haystack.document_stores.types.DocumentStore imported but unused
Remove unused import: haystack.document_stores.types.DocumentStore
(F401)
🔇 Additional comments (35)
wren-ai-service/src/web/v1/services/ask.py (3)
350-361: Effectively integrated concurrent SQL functions retrievalThe code effectively uses
asyncio.gatherto concurrently fetch SQL functions alongside SQL pairs and instructions, improving overall request efficiency.
444-444: Correctly passes SQL functions to generation pipelinesThe SQL functions are appropriately passed to both standard and followup SQL generation pipelines, enabling their use in query generation as intended.
Also applies to: 459-459
690-695: Clean formatting improvementThe parentheses placement improves readability by making the assignment structure clearer in these response creation sections.
Also applies to: 767-771
wren-ai-service/docs/config_examples/config.azure.yaml (2)
38-42: Well-structured wren_ibis engine configurationThe engine configuration follows the established pattern and provides the necessary endpoint information for the wren_ibis service.
148-150: Appropriate SQL functions retrieval pipeline configurationThe pipeline configuration correctly specifies dependencies on the wren_ibis engine and qdrant document store, ensuring proper integration with the rest of the system.
wren-ai-service/src/pipelines/indexing/__init__.py (1)
94-94: Complete ProjectMeta integrationThe ProjectMeta class is properly imported and exported, maintaining consistency with the module's existing pattern for pipeline components.
Also applies to: 104-104
wren-ai-service/tools/config/config.example.yaml (1)
156-160: Comprehensive pipeline configurationsThe additions of sql_functions_retrieval and project_meta_indexing pipelines are properly configured with their required dependencies, ensuring they'll work correctly with the rest of the system.
wren-ai-service/tools/config/config.full.yaml (1)
156-160: Correctly integrated SQL functions and project metadata pipelinesThe additions of
sql_functions_retrievalandproject_meta_indexingpipelines align well with the PR objective to enhance query generation with SQL functions support. Thesql_functions_retrievalpipeline appropriately leverages thewren_ibisengine which will provide access to SQL functions from data sources.wren-ai-service/src/providers/engine/wren.py (1)
153-173: Well-structured SQL function retrieval implementationThe
get_func_listmethod is well-implemented with proper error handling for both timeout and general exceptions. The method returns an empty list rather than raising an exception when errors occur, which is a good defensive programming approach for a retrieval function.Two minor suggestions to enhance robustness:
Consider adding a more specific type annotation for the return value, such as
list[dict[str, Any]]if the response contains structured function metadata.The timeout parameter in the session request should use
aiohttp.ClientTimeout(total=timeout)to match the pattern used in other methods of this class.deployment/kustomizations/base/cm.yaml (2)
93-97: Appropriately configured wren_ibis engineThe addition of the
wren_ibisengine configuration is correctly implemented, pointing to the right service endpoint.
193-197: Consistent pipeline configuration for SQL functionsThe SQL functions retrieval and project metadata pipelines are consistently configured here, matching the implementation in other configuration files.
wren-ai-service/docs/config_examples/config.google_ai_studio.yaml (3)
42-46: Example configuration correctly updated with wren_ibis engineThe
wren_ibisengine configuration is properly added to the Google AI Studio example, ensuring consistent documentation.
147-151: Example pipeline configuration properly updatedThe SQL functions retrieval and project metadata indexing pipelines are correctly added to the example configuration, maintaining consistency across all configuration files.
160-161: Minor formatting adjustmentsThe small formatting changes to the settings section maintain consistency in the configuration file structure.
Also applies to: 171-171
wren-ai-service/src/globals.py (2)
74-76: LGTM: Good integration of project metadata componentThe addition of the
project_metacomponent to the service container follows the established pattern and will properly integrate with the semantics preparation service.
133-136: LGTM: Well-structured SQL functions retrieval integrationThe
sql_functions_retrievalcomponent is correctly integrated with proper engine timeout configuration, which aligns with the overall PR objective of adding SQL functions support.wren-ai-service/src/pipelines/generation/sql_generation.py (4)
19-19: LGTM: Appropriate import additionAdding the
SqlFunctionimport is necessary for the type annotation in the function signatures.
36-41: LGTM: Well-structured template additionThe SQL functions section is properly added to the prompt template with appropriate conditional handling.
79-79: LGTM: Clean parameter addition to prompt functionThe
sql_functionsparameter is properly added to the function signature with appropriate type annotation and default value, and correctly passed to the prompt builder.Also applies to: 94-94
162-162: LGTM: Consistent parameter addition to run methodThe
sql_functionsparameter is consistently added to therunmethod and properly passed to the execution pipeline, maintaining the function's interface consistency.Also applies to: 177-177
wren-ai-service/src/web/v1/services/semantics_preparation.py (2)
87-87: LGTM: Properly extended task listAdding
project_metato the list of tasks for semantics preparation ensures the new component is properly initialized.
144-149: LGTM: Complete resource cleanupIncluding
project_metain the list of components to clean ensures proper resource cleanup during deletion operations.wren-ai-service/tests/pytest/pipelines/retrieval/sql_function.py (6)
7-16: LGTM: Good test data setupThe mock function definitions and list provide good coverage for testing different scenarios.
19-29: LGTM: Well-structured test fixturesThe fixtures for mocking the engine and creating the SQL functions pipeline are appropriately designed to isolate test dependencies.
31-41: LGTM: Clear basic functionality testsThese tests verify the core functionality of the
SqlFunctionclass, including string representation and handling of empty parameters.
43-55: LGTM: Comprehensive pipeline test with caching verificationThe test properly verifies that the pipeline returns the expected functions and that results are correctly cached for subsequent calls.
57-72: LGTM: Thorough datasource handling testsThese tests verify the pipeline's behavior with different data sources and case sensitivity, confirming proper caching strategy.
74-97: LGTM: Good edge case coverageThe tests for edge cases (null parameter types, null return types, and special return type values) ensure the class handles these scenarios gracefully.
wren-ai-service/src/pipelines/retrieval/sql_functions.py (3)
60-75: Check for discrepancies between the docstring/annotation and actual return type.You're returning a list of dictionaries from the WrenIbis engine (based on
SqlFunctionusage), while the function signature and decorators indicateList[str]. Please verify that the engine response matches the annotated type to avoid runtime type mismatches.
84-92: Validate concurrency aspects of TTLCache.
cachetools.TTLCacheis not guaranteed to be thread-safe or coroutine-safe by default, which could cause race conditions ifcacheis called concurrently. Confirm your pipeline usage is single-threaded or add the necessary synchronization if concurrency is expected.
140-164: Revisit exception handling for pipeline execution.A previous review comment suggested adding a
try/exceptaround pipeline execution to avoid crashes when_pipe.executefails. The method still has no error handling. If you decide to let exceptions bubble up, mark it intentional; otherwise, wrap it in a try/except block.wren-ai-service/src/pipelines/indexing/project_meta.py (4)
1-19: Overall imports and logger definition look good.
22-27: Check assumption aboutres["mdl"].If
validator.run(mdl=mdl_str)does not return a"mdl"key, accessingres["mdl"]may raise a KeyError. Consider handling possible missing keys.- return dict(mdl=res["mdl"]) + if "mdl" not in res: + logger.warning("No 'mdl' key found in validator results.") + return dict(mdl={}) + return dict(mdl=res["mdl"])
29-57: Pipeline steps for chunk, clean, and write appear well-structured.
100-103: Direct asynchronous cleaning logic is appropriate.
e486ea3 to
c8433a0
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (3)
wren-ai-service/src/pipelines/retrieval/sql_functions.py (3)
77-82:⚠️ Potential issueParameter type mismatch in function annotation.
The function annotation indicates
func_listis of typeList[str], but the code treats each item as a dictionary when constructingSqlFunctionobjects.-def sql_functions( - func_list: List[str], -) -> List[SqlFunction]: +def sql_functions( + func_list: List[Dict], +) -> List[SqlFunction]: return [SqlFunction(definition=func) for func in func_list]
140-163: 🛠️ Refactor suggestionAdd error handling to pipeline execution.
The pipeline execution lacks error handling, which could cause the entire service to fail if there's an exception.
@observe(name="SQL Functions Retrieval") async def run( self, project_id: Optional[str] = None, ) -> List[SqlFunction]: logger.info( f"Project ID: {project_id} SQL Functions Retrieval pipeline is running..." ) metadata = await self._retrieve_metadata(project_id) _data_source = metadata.get("data_source", "local_file") if _data_source in self._cache: logger.info(f"Hit cache of SQL Functions for {_data_source}") return self._cache[_data_source] input = { "data_source": _data_source, "project_id": project_id, **self._components, **self._configs, } - result = await self._pipe.execute(["cache"], inputs=input) - return result["cache"] + try: + result = await self._pipe.execute(["cache"], inputs=input) + return result["cache"] + except Exception as e: + logger.error(f"Error executing SQL Functions Retrieval pipeline: {str(e)}") + return [] # Return empty list on error
123-139:⚠️ Potential issueHandle case when no documents are found.
The code directly accesses
documents[0]without checking if the list is empty first, which could raise an IndexError.async def _retrieve_metadata(self, project_id: str) -> dict[str, Any]: filters = None if project_id is not None: filters = { "operator": "AND", "conditions": [ {"field": "project_id", "operator": "==", "value": project_id}, ], } result = await self._retriever.run(query_embedding=[], filters=filters) documents = result["documents"] # only one document for a project, thus we can return the first one - doc = documents[0] - return doc.meta + if not documents: + logger.warning(f"No metadata documents found for project_id: {project_id}") + return {"data_source": "local_file"} # Default data source + doc = documents[0] + return doc.meta
🧹 Nitpick comments (7)
wren-ai-service/src/pipelines/indexing/project_meta.py (4)
29-41: Consider making the default data source configurable.The function uses a hardcoded default value "local_file" for the data source.
- data_source = mdl.get("data_source", "local_file") + data_source = mdl.get("data_source", config.get("default_data_source", "local_file"))
54-56: Add error handling for the document writing operation.The write function doesn't handle potential errors during document writing.
@observe(capture_input=False) async def write(clean: dict[str, Any], writer: DocumentWriter) -> None: - return await writer.run(documents=clean["documents"]) + try: + return await writer.run(documents=clean["documents"]) + except Exception as e: + logger.error(f"Error writing documents: {e}") + raise
62-83: Ensure DuplicatePolicy.OVERWRITE is the intended behavior.The pipeline is configured to overwrite existing documents with the same ID. Verify this is the intended behavior for project metadata.
Also, consider adding docstrings to the class to describe its purpose and behavior.
class ProjectMeta(BasicPipeline): + """ + Pipeline for processing and indexing project metadata. + + This pipeline validates MDL strings, creates document objects, + cleans existing documents if necessary, and writes the new metadata + to the document store. + """ def __init__( self, document_store_provider: DocumentStoreProvider, **kwargs, ) -> None:
100-102: Add logging for the clean method.Consider adding logging similar to the run method for better observability.
@observe(name="Clean Documents for Project Meta") async def clean(self, project_id: Optional[str] = None) -> None: + logger.info(f"Project ID: {project_id}, Cleaning project meta documents...") await self._components["cleaner"].run(project_id=project_id)wren-ai-service/src/providers/engine/wren.py (1)
153-174: Well-structured SQL function retrieval implementationThe
get_func_listmethod is well-implemented with proper error handling for both timeout and unexpected errors. The implementation correctly returns an empty list in error cases rather than throwing exceptions that could disrupt the pipeline.However, consider adding the timeout parameter to the session.get call directly:
- async with session.get(api_endpoint, timeout=timeout) as response: + async with session.get(api_endpoint, timeout=aiohttp.ClientTimeout(total=timeout)) as response:This would be consistent with the pattern used in the
execute_sqlmethod above.wren-ai-service/src/pipelines/retrieval/sql_functions.py (2)
11-11: Remove unused import.The
DocumentStoreimport fromhaystack.document_stores.typesis not used in this file.-from haystack.document_stores.types import DocumentStore🧰 Tools
🪛 Ruff (0.8.2)
11-11:
haystack.document_stores.types.DocumentStoreimported but unusedRemove unused import:
haystack.document_stores.types.DocumentStore(F401)
60-75: Add error handling for the API call.The function makes an API call but lacks error handling for potential failures. Though error handling may be implemented in the engine's
get_func_listmethod (as shown in the relevant code snippets), it's good practice to also handle errors at this level.@observe(capture_input=False) @extract_fields(dict(func_list=List[str])) async def get_functions( engine: WrenIbis, data_source: str, engine_timeout: float = 30.0, ) -> Dict[str, Any]: async with aiohttp.ClientSession() as session: - func_list = await engine.get_func_list( - session=session, - data_source=data_source, - timeout=engine_timeout, - ) - return {"func_list": func_list} + try: + func_list = await engine.get_func_list( + session=session, + data_source=data_source, + timeout=engine_timeout, + ) + return {"func_list": func_list} + except Exception as e: + logger.error(f"Error retrieving SQL functions for {data_source}: {str(e)}") + return {"func_list": []}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (25)
deployment/kustomizations/base/cm.yaml(3 hunks)docker/config.example.yaml(2 hunks)wren-ai-service/Justfile(1 hunks)wren-ai-service/docs/config_examples/config.azure.yaml(4 hunks)wren-ai-service/docs/config_examples/config.deepseek.yaml(4 hunks)wren-ai-service/docs/config_examples/config.google_ai_studio.yaml(4 hunks)wren-ai-service/docs/config_examples/config.groq.yaml(4 hunks)wren-ai-service/docs/config_examples/config.ollama.yaml(3 hunks)wren-ai-service/src/globals.py(2 hunks)wren-ai-service/src/pipelines/generation/followup_sql_generation.py(6 hunks)wren-ai-service/src/pipelines/generation/sql_generation.py(6 hunks)wren-ai-service/src/pipelines/generation/utils/sql.py(0 hunks)wren-ai-service/src/pipelines/indexing/__init__.py(2 hunks)wren-ai-service/src/pipelines/indexing/db_schema.py(2 hunks)wren-ai-service/src/pipelines/indexing/project_meta.py(1 hunks)wren-ai-service/src/pipelines/indexing/utils/helper.py(5 hunks)wren-ai-service/src/pipelines/retrieval/__init__.py(2 hunks)wren-ai-service/src/pipelines/retrieval/sql_functions.py(1 hunks)wren-ai-service/src/providers/engine/wren.py(1 hunks)wren-ai-service/src/web/v1/services/ask.py(5 hunks)wren-ai-service/src/web/v1/services/semantics_preparation.py(2 hunks)wren-ai-service/tests/pytest/pipelines/indexing/test_helper.py(7 hunks)wren-ai-service/tests/pytest/pipelines/retrieval/sql_function.py(1 hunks)wren-ai-service/tools/config/config.example.yaml(1 hunks)wren-ai-service/tools/config/config.full.yaml(2 hunks)
💤 Files with no reviewable changes (1)
- wren-ai-service/src/pipelines/generation/utils/sql.py
🚧 Files skipped from review as they are similar to previous changes (15)
- wren-ai-service/src/pipelines/retrieval/init.py
- wren-ai-service/docs/config_examples/config.azure.yaml
- docker/config.example.yaml
- wren-ai-service/tests/pytest/pipelines/indexing/test_helper.py
- wren-ai-service/Justfile
- wren-ai-service/tools/config/config.full.yaml
- wren-ai-service/docs/config_examples/config.groq.yaml
- wren-ai-service/src/pipelines/indexing/db_schema.py
- wren-ai-service/src/pipelines/indexing/init.py
- wren-ai-service/src/pipelines/indexing/utils/helper.py
- wren-ai-service/docs/config_examples/config.google_ai_studio.yaml
- wren-ai-service/docs/config_examples/config.ollama.yaml
- wren-ai-service/src/globals.py
- wren-ai-service/tools/config/config.example.yaml
- wren-ai-service/src/web/v1/services/semantics_preparation.py
🧰 Additional context used
🧬 Code Definitions (1)
wren-ai-service/src/pipelines/retrieval/sql_functions.py (1)
wren-ai-service/src/providers/engine/wren.py (2) (2)
WrenIbis(86:173)get_func_list(153:173)
🪛 Ruff (0.8.2)
wren-ai-service/src/pipelines/retrieval/sql_functions.py
11-11: haystack.document_stores.types.DocumentStore imported but unused
Remove unused import: haystack.document_stores.types.DocumentStore
(F401)
wren-ai-service/src/pipelines/generation/followup_sql_generation.py
89-89: Undefined name List
(F821)
173-173: Undefined name List
(F821)
wren-ai-service/src/pipelines/generation/sql_generation.py
78-78: Undefined name List
(F821)
160-160: Undefined name List
(F821)
🪛 GitHub Actions: AI Service Test
wren-ai-service/src/pipelines/generation/followup_sql_generation.py
[error] 89-89: NameError: name 'List' is not defined
[error] 89-89: NameError: name 'List' is not defined
[error] 89-89: NameError: name 'List' is not defined
[error] 89-89: NameError: name 'List' is not defined
[error] 89-89: NameError: name 'List' is not defined
[error] 89-89: NameError: name 'List' is not defined
[error] 89-89: NameError: name 'List' is not defined
[error] 89-89: NameError: name 'List' is not defined
🔇 Additional comments (20)
wren-ai-service/src/pipelines/indexing/project_meta.py (5)
1-17: Well-structured imports for the project metadata pipeline.The imports are logically organized and cover all required dependencies for this pipeline implementation.
22-26: Good use of decorators for observation and field extraction.The
@observedecorator helps with monitoring pipeline execution, while@extract_fieldscleanly handles the output transformation.
84-98: Good use of the observe decorator for the main pipeline execution.The run method is well-structured and provides appropriate logging.
105-113: Good practice including a dry run capability.Including a main block with dry run functionality makes it easier to test the pipeline during development.
44-51:Details
✅ Verification successful
Verify the cleaner implementation intention.
The
cleanfunction runs the cleaner but returns the original chunk unchanged. Confirm if this is intentional or if the cleaner should modify the documents in some way.
🏁 Script executed:
#!/bin/bash # Check the implementation of DocumentCleaner to understand its behavior rg -A 10 "class DocumentCleaner" --type pyLength of output: 925
DocumentCleaner’s Side-Effect Verified
- The
DocumentCleaner(defined inwren-ai-service/src/pipelines/indexing/__init__.py) is designed to clear documents from the specified document stores.- Its
runmethod performs this deletion as a side-effect, so returning the original chunk unchanged in thecleanfunction is intentional.- No changes to the logic are required unless further clarification is needed—consider adding a comment or updating docstrings to explicitly document this behavior.
wren-ai-service/src/web/v1/services/ask.py (3)
373-375: Integration of SQL functions retrievalThis is a good addition that fetches SQL functions based on the project ID, which will enhance the SQL generation capabilities by providing more context to the LLM.
455-455: Properly passing SQL functions to generation pipelinesThe SQL functions are correctly passed to both follow-up and standard SQL generation pipelines, ensuring consistent behavior across different query paths.
Also applies to: 470-470
701-706: Improved code readability with parenthesesThe code has been reformatted to use parentheses for long assignments, which improves readability and follows Python style guidelines for line continuations.
Also applies to: 778-782
deployment/kustomizations/base/cm.yaml (2)
93-97: Proper engine configuration for SQL functionsThe added configuration for the
wren_ibisengine with the correct endpoint is essential for the SQL functions retrieval feature.
193-198: Well-defined pipeline entries for new functionalityThe configuration adds both
sql_functions_retrievalandproject_meta_indexingpipelines with their required components. This ensures the new SQL functions feature is properly integrated into the configuration system.wren-ai-service/src/pipelines/generation/sql_generation.py (3)
36-42: Well-structured SQL functions template sectionThe template additions for SQL functions are well-structured with proper conditionals and iteration, ensuring they'll only be included when available.
78-78: Consistent parameter additions for SQL functionsThe
sql_functionsparameter is consistently added to both thepromptfunction and therunmethod with the same type signature, maintaining API consistency.Also applies to: 160-160
🧰 Tools
🪛 Ruff (0.8.2)
78-78: Undefined name
List(F821)
92-92: Proper parameter passing for SQL functionsSQL functions are correctly passed to both the prompt builder and pipe execution, ensuring the data flows through the pipeline as expected.
Also applies to: 175-175
wren-ai-service/src/pipelines/generation/followup_sql_generation.py (2)
41-46: LGTM! SQL functions support added to prompt template.The template correctly handles the conditional inclusion of SQL functions when they are provided, making this feature optional and backward compatible.
173-173: MissingListtype import for parameter annotation.The
Listtype is used in the type annotation but is not imported, causing test failures.🧰 Tools
🪛 Ruff (0.8.2)
173-173: Undefined name
List(F821)
wren-ai-service/docs/config_examples/config.deepseek.yaml (3)
53-56: LGTM! Added Wren Ibis engine configuration.The configuration for the new
wren_ibisengine is properly defined with the correct endpoint.
158-160: LGTM! Added SQL functions retrieval pipeline.The configuration properly defines the new pipeline for retrieving SQL functions, specifying the necessary engine and document store.
161-162: LGTM! Added project meta indexing pipeline.The configuration correctly defines the new pipeline for project metadata indexing, which is necessary for retrieving datasource information.
wren-ai-service/tests/pytest/pipelines/retrieval/sql_function.py (1)
1-97: LGTM! Comprehensive test coverage for SQL functions.The test suite provides excellent coverage for the
SqlFunctionandSqlFunctionsclasses, testing initialization, caching mechanism, case sensitivity, and special parameter/return type handling.Particularly good:
- Mocking the engine response
- Testing cache hits
- Verifying behavior with different data sources
- Testing edge cases like empty parameters and special return types
wren-ai-service/src/pipelines/retrieval/sql_functions.py (1)
21-57: LGTM! Well-designed SqlFunction class with proper string representation.The class effectively handles different parameter types and provides a clean string representation of SQL functions. The special case handling for return types that depend on parameter types is a nice touch.
…deploy and clean func
b29feb5 to
15c8153
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (3)
wren-ai-service/src/pipelines/retrieval/sql_functions.py (3)
77-82: 🛠️ Refactor suggestionType annotation mismatch in sql_functions method.
The function expects dictionaries but is annotated with List[str].
@observe(capture_input=False) def sql_functions( - func_list: List[str], + func_list: List[dict], ) -> List[SqlFunction]: return [SqlFunction(definition=func) for func in func_list]
162-163: 🛠️ Refactor suggestionAdd error handling to the run method.
The execute method might fail if there are issues with the engine or data source. Adding error handling would make the code more robust.
input = { "data_source": _data_source, "project_id": project_id, **self._components, **self._configs, } - result = await self._pipe.execute(["cache"], inputs=input) - return result["cache"] + try: + result = await self._pipe.execute(["cache"], inputs=input) + return result["cache"] + except Exception as e: + logger.error(f"Error executing SQL Functions Retrieval pipeline for {_data_source}: {str(e)}") + return []
136-138:⚠️ Potential issueAdd check for empty documents list.
If no documents are found, accessing documents[0] will raise an IndexError.
# only one document for a project, thus we can return the first one + if not documents: + logger.warning(f"No metadata documents found for project_id: {project_id}") + return {} doc = documents[0] return doc.meta
🧹 Nitpick comments (8)
wren-ai-service/src/pipelines/indexing/project_meta.py (6)
22-26: Good implementation of validate_mdl, but consider adding docstring and error handling.The function correctly validates MDL strings, but lacks documentation and error handling for potential validation failures.
@observe(capture_input=False, capture_output=False) @extract_fields(dict(mdl=dict[str, Any])) def validate_mdl(mdl_str: str, validator: MDLValidator) -> dict[str, Any]: + """ + Validate MDL string using the provided validator. + + Args: + mdl_str: The MDL string to validate + validator: The validator to use + + Returns: + Dictionary containing validated MDL + """ + try: res = validator.run(mdl=mdl_str) return dict(mdl=res["mdl"]) + except Exception as e: + logger.error(f"MDL validation failed: {e}") + raise
54-56: Improve error handling in the write function.The write function should handle potential errors that might occur during the document writing process.
@observe(capture_input=False) async def write(clean: dict[str, Any], writer: DocumentWriter) -> None: + try: return await writer.run(documents=clean["documents"]) + except Exception as e: + logger.error(f"Error writing documents: {e}") + raise
62-82: Add class docstring and consider parameterizing hardcoded values.The ProjectMeta class lacks documentation and has a hardcoded dataset name.
class ProjectMeta(BasicPipeline): + """ + Pipeline for processing and indexing project metadata. + + This pipeline validates MDL strings, chunks them into documents, + cleans existing documents, and writes new documents to the store. + """ + DATASET_NAME = "project_meta" + def __init__( self, document_store_provider: DocumentStoreProvider, + dataset_name: str = DATASET_NAME, **kwargs, ) -> None: - store = document_store_provider.get_store(dataset_name="project_meta") + store = document_store_provider.get_store(dataset_name=dataset_name) self._components = { "validator": MDLValidator(), "cleaner": DocumentCleaner([store]), "writer": AsyncDocumentWriter( document_store=store, policy=DuplicatePolicy.OVERWRITE, ), } self._final = "write" super().__init__( AsyncDriver({}, sys.modules[__name__], result_builder=base.DictResult()) )
84-98: Add input validation to the run method.The run method should validate its inputs before executing the pipeline.
@observe(name="Project Meta Indexing") async def run( self, mdl_str: str, project_id: Optional[str] = None ) -> dict[str, Any]: + if not mdl_str: + raise ValueError("MDL string cannot be empty") + logger.info( f"Project ID: {project_id}, Project Meta Indexing pipeline is running..." ) return await self._pipe.execute( [self._final], inputs={ "mdl_str": mdl_str, "project_id": project_id, **self._components, }, )
100-102: Add logging to the clean method.For consistency with the run method, the clean method should also log when it's executed.
@observe(name="Clean Documents for Project Meta") async def clean(self, project_id: Optional[str] = None) -> None: + logger.info(f"Project ID: {project_id}, Cleaning project meta documents...") await self._components["cleaner"].run(project_id=project_id)
105-113: The dry run could benefit from more comprehensive test data.The current dry run uses minimal test data. Consider enhancing it with more realistic MDL data to better test the pipeline.
if __name__ == "__main__": from src.pipelines.common import dry_run_pipeline dry_run_pipeline( ProjectMeta, "project_meta_indexing", - mdl_str='{"data_source": "local_file"}', + mdl_str='''{ + "data_source": "local_file", + "tables": [ + {"name": "example_table", "columns": [{"name": "id", "type": "integer"}]} + ] + }''', project_id="test", )wren-ai-service/src/pipelines/retrieval/sql_functions.py (2)
11-11: Remove unused import.The DocumentStore import is not used anywhere in this file.
-from haystack.document_stores.types import DocumentStore🧰 Tools
🪛 Ruff (0.8.2)
11-11:
haystack.document_stores.types.DocumentStoreimported but unusedRemove unused import:
haystack.document_stores.types.DocumentStore(F401)
60-75: Add error handling to the get_functions method.While the WrenIbis.get_func_list method already has error handling, it would be good to add logging here as well.
@observe(capture_input=False) @extract_fields(dict(func_list=List[str])) async def get_functions( engine: WrenIbis, data_source: str, engine_timeout: float = 30.0, ) -> Dict[str, Any]: async with aiohttp.ClientSession() as session: + logger.info(f"Retrieving SQL functions from {data_source}") func_list = await engine.get_func_list( session=session, data_source=data_source, timeout=engine_timeout, ) + logger.info(f"Retrieved {len(func_list)} SQL functions from {data_source}") return {"func_list": func_list}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (25)
deployment/kustomizations/base/cm.yaml(3 hunks)docker/config.example.yaml(2 hunks)wren-ai-service/Justfile(1 hunks)wren-ai-service/docs/config_examples/config.azure.yaml(4 hunks)wren-ai-service/docs/config_examples/config.deepseek.yaml(4 hunks)wren-ai-service/docs/config_examples/config.google_ai_studio.yaml(4 hunks)wren-ai-service/docs/config_examples/config.groq.yaml(4 hunks)wren-ai-service/docs/config_examples/config.ollama.yaml(3 hunks)wren-ai-service/src/globals.py(2 hunks)wren-ai-service/src/pipelines/generation/followup_sql_generation.py(6 hunks)wren-ai-service/src/pipelines/generation/sql_generation.py(6 hunks)wren-ai-service/src/pipelines/generation/utils/sql.py(0 hunks)wren-ai-service/src/pipelines/indexing/__init__.py(2 hunks)wren-ai-service/src/pipelines/indexing/db_schema.py(2 hunks)wren-ai-service/src/pipelines/indexing/project_meta.py(1 hunks)wren-ai-service/src/pipelines/indexing/utils/helper.py(5 hunks)wren-ai-service/src/pipelines/retrieval/__init__.py(2 hunks)wren-ai-service/src/pipelines/retrieval/sql_functions.py(1 hunks)wren-ai-service/src/providers/engine/wren.py(1 hunks)wren-ai-service/src/web/v1/services/ask.py(5 hunks)wren-ai-service/src/web/v1/services/semantics_preparation.py(2 hunks)wren-ai-service/tests/pytest/pipelines/indexing/test_helper.py(7 hunks)wren-ai-service/tests/pytest/pipelines/retrieval/sql_function.py(1 hunks)wren-ai-service/tools/config/config.example.yaml(1 hunks)wren-ai-service/tools/config/config.full.yaml(2 hunks)
💤 Files with no reviewable changes (1)
- wren-ai-service/src/pipelines/generation/utils/sql.py
🚧 Files skipped from review as they are similar to previous changes (20)
- wren-ai-service/src/pipelines/retrieval/init.py
- wren-ai-service/docs/config_examples/config.azure.yaml
- wren-ai-service/docs/config_examples/config.google_ai_studio.yaml
- wren-ai-service/tools/config/config.example.yaml
- docker/config.example.yaml
- wren-ai-service/src/pipelines/indexing/init.py
- wren-ai-service/tests/pytest/pipelines/indexing/test_helper.py
- wren-ai-service/docs/config_examples/config.ollama.yaml
- wren-ai-service/src/providers/engine/wren.py
- wren-ai-service/docs/config_examples/config.groq.yaml
- deployment/kustomizations/base/cm.yaml
- wren-ai-service/src/web/v1/services/semantics_preparation.py
- wren-ai-service/src/pipelines/generation/sql_generation.py
- wren-ai-service/docs/config_examples/config.deepseek.yaml
- wren-ai-service/src/pipelines/indexing/db_schema.py
- wren-ai-service/src/pipelines/indexing/utils/helper.py
- wren-ai-service/Justfile
- wren-ai-service/src/pipelines/generation/followup_sql_generation.py
- wren-ai-service/tools/config/config.full.yaml
- wren-ai-service/src/globals.py
🧰 Additional context used
🧬 Code Definitions (1)
wren-ai-service/src/pipelines/retrieval/sql_functions.py (1)
wren-ai-service/src/providers/engine/wren.py (2) (2)
WrenIbis(86:173)get_func_list(153:173)
🪛 Ruff (0.8.2)
wren-ai-service/src/pipelines/retrieval/sql_functions.py
11-11: haystack.document_stores.types.DocumentStore imported but unused
Remove unused import: haystack.document_stores.types.DocumentStore
(F401)
🔇 Additional comments (5)
wren-ai-service/src/web/v1/services/ask.py (3)
373-375: SQL function retrieval added to the ask pipeline.The addition of SQL function retrieval enhances the query generation capabilities by providing SQL function context to the language model.
455-455: SQL functions integrated into both query generation pipelines.SQL functions are now passed to both standard and follow-up SQL generation pipelines, allowing them to incorporate function context in the generated queries.
Also applies to: 470-470
701-706: Code formatting improvements for better readability.The multiline assignments have been reformatted using parentheses for consistency and improved readability.
Also applies to: 778-782
wren-ai-service/tests/pytest/pipelines/retrieval/sql_function.py (1)
1-97: Comprehensive test suite for SQL Functions implementation.The test suite thoroughly covers the SqlFunction and SqlFunctions classes, including initialization, caching behavior, and handling of different parameter types and edge cases. The tests verify both basic functionality and important edge cases like empty parameters and special return types.
The test coverage is excellent, with tests for:
- Basic SqlFunction initialization
- Empty parameter handling
- Pipeline execution and caching
- Case sensitivity
- Various parameter and return type scenarios
wren-ai-service/src/pipelines/retrieval/sql_functions.py (1)
21-58: Well-implemented SqlFunction class.The SqlFunction class provides a clean representation of SQL functions with proper handling of various parameter and return type scenarios.
The implementation:
- Properly extracts name, parameters, and return type
- Handles special cases like "same as input" return types
- Provides clear string representation of SQL functions
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
wren-ai-service/src/pipelines/retrieval/sql_functions.py (1)
96-121: Enhance class documentation.Consider adding more comprehensive docstrings to explain the purpose of the
SqlFunctionsclass, its initialization parameters, and how it integrates with the overall pipeline architecture.class SqlFunctions(BasicPipeline): + """ + Pipeline component for retrieving SQL functions from data sources. + + This class retrieves SQL functions from the specified engine, processes them + into a structured format, and caches the results for efficient subsequent + retrieval. It integrates with the project metadata system to determine the + appropriate data source. + + Attributes: + _retriever: The retriever for project metadata + _cache: TTL cache for storing SQL functions + _components: Dictionary of component instances + _configs: Dictionary of configuration parameters + """ def __init__( self, engine: Engine, document_store_provider: DocumentStoreProvider, engine_timeout: Optional[float] = 30.0, ttl: Optional[int] = 60 * 60 * 24, **kwargs, ) -> None: + """ + Initialize the SqlFunctions pipeline. + + Args: + engine: The engine instance for retrieving SQL functions + document_store_provider: Provider for document stores + engine_timeout: Timeout for engine requests in seconds + ttl: Time-to-live for cache entries in seconds + **kwargs: Additional keyword arguments + """
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
wren-ai-service/src/pipelines/retrieval/sql_functions.py(1 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
wren-ai-service/src/pipelines/retrieval/sql_functions.py (1)
wren-ai-service/src/providers/engine/wren.py (2) (2)
WrenIbis(86:173)get_func_list(153:173)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: pytest
- GitHub Check: Analyze (go)
🔇 Additional comments (7)
wren-ai-service/src/pipelines/retrieval/sql_functions.py (7)
59-74: Add error handling for the engine.get_func_list call.The
get_functionsmethod makes an API call but lacks error handling. Consider adding try/except blocks to gracefully handle potential errors from the Wren engine.async def get_functions( engine: WrenIbis, data_source: str, engine_timeout: float = 30.0, ) -> Dict[str, Any]: async with aiohttp.ClientSession() as session: - func_list = await engine.get_func_list( - session=session, - data_source=data_source, - timeout=engine_timeout, - ) - return {"func_list": func_list} + try: + func_list = await engine.get_func_list( + session=session, + data_source=data_source, + timeout=engine_timeout, + ) + return {"func_list": func_list} + except Exception as e: + logger.error(f"Error retrieving SQL functions for {data_source}: {str(e)}") + return {"func_list": []}
76-81: Mismatch between function parameter type andSqlFunctionconstructor.Although
sql_functionsis annotated to acceptList[str], each element is treated as a dictionary when constructingSqlFunction. This will cause errors if the incoming list truly contains strings. Update either the annotation or the function logic to ensure consistency.- def sql_functions(func_list: List[str]) -> List[SqlFunction]: + def sql_functions(func_list: List[dict]) -> List[SqlFunction]: return [SqlFunction(definition=func) for func in func_list]
123-139: Handle case when no documents are found.
documents[0]will raise an IndexError if the list is empty. Consider adding a condition to handle scenarios where project metadata is missing or invalid.async def _retrieve_metadata(self, project_id: str) -> dict[str, Any]: filters = None if project_id is not None: filters = { "operator": "AND", "conditions": [ {"field": "project_id", "operator": "==", "value": project_id}, ], } result = await self._retriever.run(query_embedding=[], filters=filters) documents = result["documents"] # only one document for a project, thus we can return the first one - doc = documents[0] - return doc.meta + if not documents: + logger.warning(f"No metadata documents found for project_id: {project_id}") + return {"data_source": "local_file"} # Default data source + doc = documents[0] + return doc.meta
140-164: Add error handling to the run method.The
runmethod should include error handling for the pipeline execution to prevent crashes in case of failures.@observe(name="SQL Functions Retrieval") async def run( self, project_id: Optional[str] = None, ) -> List[SqlFunction]: logger.info( f"Project ID: {project_id} SQL Functions Retrieval pipeline is running..." ) metadata = await self._retrieve_metadata(project_id) _data_source = metadata.get("data_source", "local_file") if _data_source in self._cache: logger.info(f"Hit cache of SQL Functions for {_data_source}") return self._cache[_data_source] input = { "data_source": _data_source, "project_id": project_id, **self._components, **self._configs, } - result = await self._pipe.execute(["cache"], inputs=input) - return result["cache"] + try: + result = await self._pipe.execute(["cache"], inputs=input) + return result["cache"] + except Exception as e: + logger.error(f"Error executing SQL Functions Retrieval pipeline for {_data_source}: {str(e)}") + # Return an empty list if the pipeline execution fails + return []
20-57: Good implementation of SqlFunction class.The
SqlFunctionclass effectively handles different parameter types and return types, with special handling for cases where the return type is "same as input" or "same as arg types". The string representation is well-formed and consistent.
82-91: LGTM: Cache function implementation.The cache function correctly stores SQL functions in a TTL cache and returns the cached value.
166-174: Good dry run testing setup.The code includes a useful main block for executing a dry run of the pipeline, which is helpful for testing and development purposes.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
wren-ai-service/src/pipelines/indexing/project_meta.py (2)
29-41:⚠️ Potential issueThe chunk function still creates an empty document without MDL content.
The function creates a Document with metadata, but it doesn't include the actual MDL content in the Document. This issue was previously flagged in a past review and hasn't been addressed.
@observe(capture_input=False) def chunk( mdl: dict[str, Any], project_id: Optional[str] = None, ) -> dict[str, Any]: addition = {"project_id": project_id} if project_id else {} data_source = mdl.get("dataSource", "local_file").lower() document = Document( id=str(uuid.uuid4()), + content=mdl, # Include the actual MDL content in the document meta={"data_source": data_source, **addition}, ) return {"documents": [document]}
44-51:⚠️ Potential issueThe clean function doesn't actually use the chunk parameter.
The clean function accepts a chunk parameter but doesn't use it at all. This issue was previously flagged in a past review and hasn't been addressed.
@observe(capture_input=False, capture_output=False) async def clean( chunk: dict[str, Any], cleaner: DocumentCleaner, project_id: Optional[str] = None, ) -> dict[str, Any]: await cleaner.run(project_id=project_id) + # Either use the chunk parameter in the cleaning process + # or document why it's not being used return chunk
🧹 Nitpick comments (5)
wren-ai-service/src/pipelines/indexing/project_meta.py (5)
54-56: Remove the unnecessary return statement.The
async def writefunction usesreturn awaitwhich is redundant. In this case, simply usingawaitis cleaner and more idiomatic.@observe(capture_input=False) async def write(clean: dict[str, Any], writer: DocumentWriter) -> None: - return await writer.run(documents=clean["documents"]) + await writer.run(documents=clean["documents"])
85-98: Consider adding error handling in the run method.The run method lacks error handling. If the pipeline execution fails, it would be helpful to catch the exception, log it, and return a meaningful error response rather than allowing the exception to propagate up.
@observe(name="Project Meta Indexing") async def run( self, mdl_str: str, project_id: Optional[str] = None ) -> dict[str, Any]: logger.info( f"Project ID: {project_id}, Project Meta Indexing pipeline is running..." ) + try: return await self._pipe.execute( [self._final], inputs={ "mdl_str": mdl_str, "project_id": project_id, **self._components, }, ) + except Exception as e: + logger.error(f"Project Meta Indexing pipeline failed: {e}") + raise
100-102: Add logging to the clean method.The clean method should include logging similar to the run method for consistency and observability.
@observe(name="Clean Documents for Project Meta") async def clean(self, project_id: Optional[str] = None) -> None: + logger.info(f"Project ID: {project_id}, Cleaning documents for Project Meta...") await self._components["cleaner"].run(project_id=project_id)
62-83: Consider adding docstrings to the class and methods.The
ProjectMetaclass and its methods lack docstrings that explain their purpose, parameters, and return values. Adding these would improve code readability and maintainability.class ProjectMeta(BasicPipeline): + """ + Pipeline for processing and indexing project metadata. + + This pipeline validates, chunks, cleans, and writes project metadata + to a document store. + """ def __init__( self, document_store_provider: DocumentStoreProvider, **kwargs, ) -> None: + """ + Initialize the ProjectMeta pipeline. + + Args: + document_store_provider: Provider for accessing document stores + **kwargs: Additional arguments + """ store = document_store_provider.get_store(dataset_name="project_meta")
105-113: Consider adding a complete example with more detailed MDL.The dry run example uses a very minimal MDL with just the data_source field. Consider providing a more comprehensive example that better demonstrates the pipeline's functionality, especially if there are specific fields expected in the MDL that would be useful for testing.
if __name__ == "__main__": from src.pipelines.common import dry_run_pipeline dry_run_pipeline( ProjectMeta, "project_meta_indexing", - mdl_str='{"data_source": "local_file"}', + mdl_str=''' + { + "data_source": "local_file", + "tables": [ + {"name": "example_table", "columns": ["col1", "col2"]} + ], + "relationships": [] + } + ''', project_id="test", )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
wren-ai-service/src/pipelines/indexing/project_meta.py(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: pytest
- GitHub Check: Analyze (javascript-typescript)
- GitHub Check: Analyze (go)
🔇 Additional comments (1)
wren-ai-service/src/pipelines/indexing/project_meta.py (1)
1-18: LGTM: Imports and logging setup are well-organized.The imports are properly organized, starting with standard library imports, then third-party libraries, and finally local modules. The logging configuration is also appropriately set up.
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
wren-ai-service/eval/preparation.py (1)
239-239: Appropriate addition of data source identifiers.Adding the
"dataSource": "DUCKDB"field to the MDL objects is consistent with the PR's objective of supporting SQL functions in the query generation pipeline. This identifier allows the system to determine which database engine's SQL functions to retrieve during query generation.Consider extracting "DUCKDB" to a constant at the module level for better maintainability if you anticipate supporting other database engines in the future:
+ DATABASE_ENGINE = "DUCKDB" ... - "dataSource": "DUCKDB", + "dataSource": DATABASE_ENGINE, ... - "dataSource": "DUCKDB", + "dataSource": DATABASE_ENGINE,Also applies to: 323-323
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
wren-ai-service/eval/preparation.py(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: pytest
- GitHub Check: Analyze (go)
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
wren-ai-service/src/force_update_config.py (1)
25-25: Update print statement to reflect conditional engine assignment.The current print message indicates that all pipelines have been updated to 'wren_ui', but this is no longer accurate since the SQL functions retrieval pipeline now uses 'wren_ibis'.
- print("Successfully updated engine names to 'wren_ui' in all pipelines") + print("Successfully updated engine names in all pipelines")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
wren-ai-service/Justfile(2 hunks)wren-ai-service/src/force_update_config.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- wren-ai-service/Justfile
⏰ Context from checks skipped due to timeout of 90000ms (3)
- GitHub Check: pytest
- GitHub Check: Analyze (javascript-typescript)
- GitHub Check: Analyze (go)
🔇 Additional comments (1)
wren-ai-service/src/force_update_config.py (1)
16-19: LGTM! Engine assignment based on pipeline name implemented correctly.The condition correctly sets the engine to "wren_ibis" for the SQL functions retrieval pipeline while maintaining "wren_ui" for all other pipelines, which aligns with the PR's objective to support SQL functions.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
wren-ai-service/src/pipelines/indexing/project_meta.py (2)
37-39: Document the DuckDB workaround implementationThe code includes a workaround for DuckDB that changes the data source to "local_file". While the comment explains why, it would be helpful to clarify if this is a temporary solution and when it might be resolved.
if data_source == "duckdb": - # fix duckdb to local_file due to wren-ibis implementation at the moment + # TODO: Temporary workaround - fix duckdb to local_file due to current wren-ibis implementation limitations + # This should be revisited when wren-ibis supports DuckDB natively data_source = "local_file"
66-107: Add comprehensive docstrings to the ProjectMeta class and its methodsThe class and its methods lack proper docstrings explaining their purpose, parameters, and return values. Adding docstrings would improve code readability and maintainability.
class ProjectMeta(BasicPipeline): + """ + Pipeline for processing and indexing project metadata. + + This pipeline validates, chunks, cleans, and writes project metadata to the document store. + """ def __init__( self, document_store_provider: DocumentStoreProvider, **kwargs, ) -> None: + """ + Initialize the ProjectMeta pipeline with necessary components. + + Args: + document_store_provider: Provider for the document store + **kwargs: Additional keyword arguments + """ store = document_store_provider.get_store(dataset_name="project_meta") # Rest of the initialization code... @observe(name="Project Meta Indexing") async def run( self, mdl_str: str, project_id: Optional[str] = None ) -> dict[str, Any]: + """ + Execute the project metadata indexing pipeline. + + Args: + mdl_str: Model definition language string to process + project_id: Optional project identifier + + Returns: + Dictionary containing the results of the pipeline execution + """ logger.info( f"Project ID: {project_id}, Project Meta Indexing pipeline is running..." ) # Rest of the method... @observe(name="Clean Documents for Project Meta") async def clean(self, project_id: Optional[str] = None) -> None: + """ + Clean documents for a specific project. + + Args: + project_id: Optional project identifier to clean documents for + """ await self._components["cleaner"].run(project_id=project_id)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
wren-ai-service/src/pipelines/indexing/project_meta.py(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: pytest
- GitHub Check: Analyze (go)
🔇 Additional comments (3)
wren-ai-service/src/pipelines/indexing/project_meta.py (3)
29-45: The chunk function creates an empty document without MDL content.While the function creates a Document with metadata, it doesn't actually include the MDL content in the Document, which seems to defeat the purpose of indexing project metadata.
@observe(capture_input=False) def chunk( mdl: dict[str, Any], project_id: Optional[str] = None, ) -> dict[str, Any]: addition = {"project_id": project_id} if project_id else {} data_source = mdl.get("dataSource", "local_file").lower() if data_source == "duckdb": # fix duckdb to local_file due to wren-ibis implementation at the moment data_source = "local_file" document = Document( id=str(uuid.uuid4()), + content=mdl, # Include the actual MDL content in the document meta={"data_source": data_source, **addition}, ) return {"documents": [document]}
35-35: Standardizedata_sourceKey Usage in Project MetadataIt appears that the code retrieves the data source with the key
"dataSource"and then converts it to lowercase, whereas the rest of the codebase consistently uses"data_source". For clarity and to avoid potential mismatches:- data_source = mdl.get("dataSource", "local_file").lower() + data_source = mdl.get("data_source", "local_file").lower()
48-55: The clean function doesn't actually clean the chunk content.The clean function calls the cleaner's run method but doesn't use the chunk parameter at all, making the function implementation inconsistent with its signature.
@observe(capture_input=False, capture_output=False) async def clean( chunk: dict[str, Any], cleaner: DocumentCleaner, project_id: Optional[str] = None, ) -> dict[str, Any]: await cleaner.run(project_id=project_id) + # Either use the chunk parameter in the cleaning process + # or document why it's not being used return chunk
| ## Start of Pipeline | ||
| @observe(capture_input=False, capture_output=False) | ||
| @extract_fields(dict(mdl=dict[str, Any])) | ||
| def validate_mdl(mdl_str: str, validator: MDLValidator) -> dict[str, Any]: | ||
| res = validator.run(mdl=mdl_str) | ||
| return dict(mdl=res["mdl"]) | ||
|
|
||
|
|
||
| @observe(capture_input=False) | ||
| def chunk( | ||
| mdl: dict[str, Any], | ||
| project_id: Optional[str] = None, | ||
| ) -> dict[str, Any]: | ||
| addition = {"project_id": project_id} if project_id else {} | ||
| data_source = mdl.get("dataSource", "local_file").lower() | ||
|
|
||
| if data_source == "duckdb": | ||
| # fix duckdb to local_file due to wren-ibis implementation at the moment | ||
| data_source = "local_file" | ||
|
|
||
| document = Document( | ||
| id=str(uuid.uuid4()), | ||
| meta={"data_source": data_source, **addition}, | ||
| ) | ||
| return {"documents": [document]} | ||
|
|
||
|
|
||
| @observe(capture_input=False, capture_output=False) | ||
| async def clean( | ||
| chunk: dict[str, Any], | ||
| cleaner: DocumentCleaner, | ||
| project_id: Optional[str] = None, | ||
| ) -> dict[str, Any]: | ||
| await cleaner.run(project_id=project_id) | ||
| return chunk | ||
|
|
||
|
|
||
| @observe(capture_input=False) | ||
| async def write(clean: dict[str, Any], writer: DocumentWriter) -> None: | ||
| return await writer.run(documents=clean["documents"]) | ||
|
|
||
|
|
||
| ## End of Pipeline |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Add error handling to pipeline functions
The pipeline functions lack error handling, which could lead to unclear errors or silent failures. Consider adding try-except blocks to handle potential errors and provide meaningful error messages.
Here's an example for the validate_mdl function:
@observe(capture_input=False, capture_output=False)
@extract_fields(dict(mdl=dict[str, Any]))
def validate_mdl(mdl_str: str, validator: MDLValidator) -> dict[str, Any]:
+ try:
res = validator.run(mdl=mdl_str)
return dict(mdl=res["mdl"])
+ except Exception as e:
+ logger.error(f"Error validating MDL: {str(e)}")
+ raise ValueError(f"Failed to validate MDL: {str(e)}") from eSimilar error handling should be added to other pipeline functions.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ## Start of Pipeline | |
| @observe(capture_input=False, capture_output=False) | |
| @extract_fields(dict(mdl=dict[str, Any])) | |
| def validate_mdl(mdl_str: str, validator: MDLValidator) -> dict[str, Any]: | |
| res = validator.run(mdl=mdl_str) | |
| return dict(mdl=res["mdl"]) | |
| @observe(capture_input=False) | |
| def chunk( | |
| mdl: dict[str, Any], | |
| project_id: Optional[str] = None, | |
| ) -> dict[str, Any]: | |
| addition = {"project_id": project_id} if project_id else {} | |
| data_source = mdl.get("dataSource", "local_file").lower() | |
| if data_source == "duckdb": | |
| # fix duckdb to local_file due to wren-ibis implementation at the moment | |
| data_source = "local_file" | |
| document = Document( | |
| id=str(uuid.uuid4()), | |
| meta={"data_source": data_source, **addition}, | |
| ) | |
| return {"documents": [document]} | |
| @observe(capture_input=False, capture_output=False) | |
| async def clean( | |
| chunk: dict[str, Any], | |
| cleaner: DocumentCleaner, | |
| project_id: Optional[str] = None, | |
| ) -> dict[str, Any]: | |
| await cleaner.run(project_id=project_id) | |
| return chunk | |
| @observe(capture_input=False) | |
| async def write(clean: dict[str, Any], writer: DocumentWriter) -> None: | |
| return await writer.run(documents=clean["documents"]) | |
| ## End of Pipeline | |
| ## Start of Pipeline | |
| @observe(capture_input=False, capture_output=False) | |
| @extract_fields(dict(mdl=dict[str, Any])) | |
| def validate_mdl(mdl_str: str, validator: MDLValidator) -> dict[str, Any]: | |
| try: | |
| res = validator.run(mdl=mdl_str) | |
| return dict(mdl=res["mdl"]) | |
| except Exception as e: | |
| logger.error(f"Error validating MDL: {str(e)}") | |
| raise ValueError(f"Failed to validate MDL: {str(e)}") from e | |
| @observe(capture_input=False) | |
| def chunk( | |
| mdl: dict[str, Any], | |
| project_id: Optional[str] = None, | |
| ) -> dict[str, Any]: | |
| addition = {"project_id": project_id} if project_id else {} | |
| data_source = mdl.get("dataSource", "local_file").lower() | |
| if data_source == "duckdb": | |
| # fix duckdb to local_file due to wren-ibis implementation at the moment | |
| data_source = "local_file" | |
| document = Document( | |
| id=str(uuid.uuid4()), | |
| meta={"data_source": data_source, **addition}, | |
| ) | |
| return {"documents": [document]} | |
| @observe(capture_input=False, capture_output=False) | |
| async def clean( | |
| chunk: dict[str, Any], | |
| cleaner: DocumentCleaner, | |
| project_id: Optional[str] = None, | |
| ) -> dict[str, Any]: | |
| await cleaner.run(project_id=project_id) | |
| return chunk | |
| @observe(capture_input=False) | |
| async def write(clean: dict[str, Any], writer: DocumentWriter) -> None: | |
| return await writer.run(documents=clean["documents"]) | |
| ## End of Pipeline |
This PR introduces SQL functions support to enhance the query generation capabilities. Here are the key changes:
Added new
SqlFunctionspipeline component to retrieve and cache SQL functions from data sourcesEnhanced SQL Generation Pipelines
Schema Processing Improvements
Other Changes
These changes will enable better SQL query generation by providing context about available SQL functions to the LLM.
Summary by CodeRabbit
New Features
wren_ibisengine.SqlFunctionsentity for enhanced SQL function management.ProjectMetaclass for processing project metadata.wren_ibisacross multiple configuration files.Bug Fixes / Refactor
Tests
SqlFunctionandSqlFunctionsclasses to validate functionality.