diff --git a/crewai_tools/__init__.py b/crewai_tools/__init__.py index 85fe5ed6..9ff50396 100644 --- a/crewai_tools/__init__.py +++ b/crewai_tools/__init__.py @@ -9,6 +9,8 @@ ) from .tools import ( AIMindTool, + AirweaveAdvancedSearchTool, + AirweaveSearchTool, ApifyActorsTool, ArxivPaperTool, BraveSearchTool, diff --git a/crewai_tools/tools/__init__.py b/crewai_tools/tools/__init__.py index 2b0bb968..23060c9a 100644 --- a/crewai_tools/tools/__init__.py +++ b/crewai_tools/tools/__init__.py @@ -1,4 +1,5 @@ from .ai_mind_tool.ai_mind_tool import AIMindTool +from .airweave_tool import AirweaveAdvancedSearchTool, AirweaveSearchTool from .apify_actors_tool.apify_actors_tool import ApifyActorsTool from .arxiv_paper_tool.arxiv_paper_tool import ArxivPaperTool from .brave_search_tool.brave_search_tool import BraveSearchTool diff --git a/crewai_tools/tools/airweave_tool/README.md b/crewai_tools/tools/airweave_tool/README.md new file mode 100644 index 00000000..02786943 --- /dev/null +++ b/crewai_tools/tools/airweave_tool/README.md @@ -0,0 +1,360 @@ +# Airweave Search Tools + +Search across all your connected data sources (Stripe, GitHub, Notion, Slack, and 50+ more) using Airweave's unified search API. + +## Installation + +```bash +pip install 'crewai-tools[airweave]' +``` + +Or install the SDK directly: + +```bash +pip install airweave-sdk +``` + +## Setup + +### 1. Get your API key + +Sign up at [https://app.airweave.ai](https://app.airweave.ai) and get your API key. + +### 2. Set environment variable + +```bash +export AIRWEAVE_API_KEY="your_api_key_here" +``` + +### 3. Create a collection and connect data sources + +Through the Airweave dashboard: +1. Create a new collection +2. Add source connections (Stripe, GitHub, Notion, etc.) +3. Wait for initial sync to complete +4. Copy your collection's `readable_id` + +## Tools + +### AirweaveSearchTool + +Basic search tool for straightforward queries. Mirrors the `client.collections.search()` method from the Airweave Python SDK. + +**When to use:** +- Simple searches without filtering +- Quick lookups across all data sources +- When you don't need advanced features + +**Example:** + +```python +from crewai import Agent, Task, Crew +from crewai_tools import AirweaveSearchTool + +# Initialize the tool +search_tool = AirweaveSearchTool( + collection_id="my-collection-id" +) + +# Create an agent with the tool +agent = Agent( + role="Data Analyst", + goal="Find information from connected data sources", + tools=[search_tool], + verbose=True +) + +# Create a task +task = Task( + description="Find all failed payments from the last month", + agent=agent, + expected_output="List of failed payments with customer details" +) + +# Run the crew +crew = Crew(agents=[agent], tasks=[task]) +result = crew.kickoff() +``` + +**Get AI-generated answers:** + +```python +from crewai import Task + +task = Task( + description=""" + What are the most common customer complaints this month? + Use response_type='completion' to get an AI-generated summary. + """, + agent=agent +) +``` + +### AirweaveAdvancedSearchTool + +Advanced search tool with filtering, reranking, and fine-tuned control. Mirrors the `client.collections.search_advanced()` method from the Airweave Python SDK. + +**When to use:** +- Filter by specific data sources +- Prioritize recent results +- Set minimum relevance scores +- Enable AI reranking for better results +- Choose specific search methods (hybrid/neural/keyword) + +**Example:** + +```python +from crewai_tools import AirweaveAdvancedSearchTool + +# Initialize with advanced options +advanced_tool = AirweaveAdvancedSearchTool( + collection_id="my-collection-id" +) + +agent = Agent( + role="Customer Support Analyst", + goal="Find recent customer issues from specific sources", + tools=[advanced_tool] +) + +task = Task( + description=""" + Find customer complaints about billing from Zendesk in the last week. + Use these parameters: + - source_filter: 'Zendesk' + - recency_bias: 0.8 + - enable_reranking: True + - score_threshold: 0.7 + """, + agent=agent +) +``` + +## Parameters + +### AirweaveSearchTool Parameters + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `query` | str | Required | Search query to find relevant information | +| `limit` | int | 10 | Maximum number of results (1-100) | +| `response_type` | str | "raw" | "raw" for search results, "completion" for AI answer | +| `recency_bias` | float | 0.0 | Weight for recent results (0.0-1.0) | + +### AirweaveAdvancedSearchTool Parameters + +All basic parameters plus: + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `source_filter` | str | None | Filter by specific source (e.g., "Stripe", "GitHub") | +| `score_threshold` | float | None | Minimum similarity score (0.0-1.0) | +| `recency_bias` | float | 0.3 | Weight for recent results (0.0-1.0) | +| `enable_reranking` | bool | True | Enable AI reranking for better relevance | +| `search_method` | str | "hybrid" | "hybrid", "neural", or "keyword" | + +### Tool Configuration (Constructor) + +Both tools accept these configuration parameters: + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `collection_id` | str | Required | Your collection's readable ID | +| `base_url` | str | None | Custom API URL for self-hosted instances | +| `max_content_length` | int | 300 | Max characters to show per result | + +## What Can Airweave Search? + +Airweave connects to 50+ data sources including: + +### Finance & Billing +- Stripe +- QuickBooks +- Chargebee + +### Development Tools +- GitHub +- GitLab +- Jira +- Linear + +### Collaboration +- Slack +- Microsoft Teams +- Discord + +### Productivity +- Notion +- Google Drive +- Confluence +- Dropbox + +### CRM & Support +- Salesforce +- HubSpot +- Zendesk +- Intercom + +### And many more... + +[See full list of integrations →](https://docs.airweave.ai/integrations) + +## Features + +✅ **Unified Search** - Search across all data sources with one query +✅ **Semantic Search** - Natural language understanding +✅ **Hybrid Search** - Combines vector and keyword search +✅ **AI Reranking** - Improves result relevance (Advanced) +✅ **AI Answers** - Get generated answers via `response_type="completion"` +✅ **Recency Bias** - Prioritize recent results +✅ **Source Filtering** - Search specific sources only (Advanced) +✅ **Score Threshold** - Filter by relevance (Advanced) +✅ **Incremental Sync** - Data stays automatically updated +✅ **Multi-tenant** - Each user has their own collections + +## Environment Variables + +| Variable | Required | Default | Description | +|----------|----------|---------|-------------| +| `AIRWEAVE_API_KEY` | Yes | - | Your Airweave API key | + +## Response Types + +### Raw Results (`response_type="raw"`) + +Returns structured search results with: +- Content snippets +- Similarity scores +- Source information +- Entity IDs +- Creation timestamps +- URLs (when available) + +### AI Completion (`response_type="completion"`) + +Returns an AI-generated natural language answer based on the retrieved documents. Use this when you want: +- Summarized information +- Direct answers to questions +- Synthesized insights from multiple sources + +## Search Methods (Advanced Tool) + +### Hybrid (Default) +Combines semantic (neural) and keyword (BM25) search for best results. + +### Neural +Pure semantic search using embeddings. Best for conceptual queries. + +### Keyword +Traditional keyword search. Best for exact term matching. + +## Use Cases + +### Customer Support Agent +```python +agent = Agent( + role="Customer Support Specialist", + goal="Find and resolve customer issues", + tools=[AirweaveAdvancedSearchTool(collection_id="support-data")], + backstory="Expert at finding relevant customer data across Zendesk, Slack, and email" +) +``` + +### Sales Intelligence Agent +```python +agent = Agent( + role="Sales Intelligence Analyst", + goal="Research accounts and opportunities", + tools=[AirweaveSearchTool(collection_id="sales-data")], + backstory="Analyzes data from Salesforce, HubSpot, and LinkedIn" +) +``` + +### Financial Analysis Agent +```python +agent = Agent( + role="Financial Analyst", + goal="Analyze payment trends and issues", + tools=[AirweaveAdvancedSearchTool(collection_id="finance-data")], + backstory="Tracks payments, invoices, and transactions from Stripe and QuickBooks" +) +``` + +### Technical Documentation Agent +```python +agent = Agent( + role="Documentation Specialist", + goal="Answer technical questions from internal docs", + tools=[AirweaveSearchTool(collection_id="docs-collection")], + backstory="Searches through Notion, Confluence, and GitHub repos" +) +``` + +## Error Handling + +The tools handle errors gracefully and return clear error messages: + +- **Missing API key** - Clear instruction on how to set it +- **Collection not found** - Verifies collection exists +- **No results** - Suggests rephrasing query +- **API errors** - Returns error details for debugging + +## Advanced Filtering Examples + +### Filter by Multiple Sources (Future Enhancement) + +Currently supports single source filtering. For multiple sources, create filter using SDK types: + +```python +from airweave import Filter, FieldCondition, MatchAny + +# In your agent's task description, specify the filter logic +# The tool currently supports single source_filter parameter +``` + +### Combine Multiple Filters + +For complex filtering needs beyond single source, consider using the Airweave SDK directly within a custom tool or use multiple search calls. + +## Best Practices + +1. **Start with Basic Search** - Use `AirweaveSearchTool` for most queries +2. **Use Advanced When Needed** - Switch to `AirweaveAdvancedSearchTool` when you need filtering or reranking +3. **Set Appropriate Limits** - Higher limits for comprehensive searches, lower for quick lookups +4. **Enable Reranking for Complex Queries** - Better results for nuanced questions +5. **Use Recency Bias for Time-Sensitive Data** - Great for support tickets, recent transactions +6. **Choose Response Type Based on Need** - "raw" for structured data, "completion" for answers +7. **Filter by Source for Focused Searches** - Reduces noise when you know the source + +## Troubleshooting + +### "No results found" +- Check that your collection has data synced +- Verify sync jobs completed successfully +- Try broader search terms +- Lower score_threshold if using advanced search + +### "Unable to generate an answer" +- Ensure you have relevant data in your collection +- Try response_type="raw" to see what results are available +- Rephrase your query to be more specific + +### Import errors +```bash +pip install --upgrade airweave-sdk crewai-tools +``` + +## Learn More + +- [Airweave Documentation](https://docs.airweave.ai) +- [API Reference](https://docs.airweave.ai/api-reference) +- [Python SDK](https://github.com/airweave-ai/python-sdk) +- [Get API Key](https://app.airweave.ai) +- [CrewAI Documentation](https://docs.crewai.com) + +## Support + +- Discord: [Join Airweave Community](https://discord.gg/airweave) +- Email: support@airweave.ai +- GitHub Issues: [airweave-ai/python-sdk](https://github.com/airweave-ai/python-sdk/issues) + diff --git a/crewai_tools/tools/airweave_tool/__init__.py b/crewai_tools/tools/airweave_tool/__init__.py new file mode 100644 index 00000000..1a41826f --- /dev/null +++ b/crewai_tools/tools/airweave_tool/__init__.py @@ -0,0 +1,10 @@ +"""Airweave search tools for CrewAI.""" + +from .airweave_search_tool import AirweaveSearchTool +from .airweave_advanced_search_tool import AirweaveAdvancedSearchTool + +__all__ = [ + "AirweaveSearchTool", + "AirweaveAdvancedSearchTool", +] + diff --git a/crewai_tools/tools/airweave_tool/airweave_advanced_search_tool.py b/crewai_tools/tools/airweave_tool/airweave_advanced_search_tool.py new file mode 100644 index 00000000..bb664505 --- /dev/null +++ b/crewai_tools/tools/airweave_tool/airweave_advanced_search_tool.py @@ -0,0 +1,357 @@ +"""Airweave Advanced Search Tool with filtering and reranking.""" + +import os +from typing import Any, List, Optional, Type + +from crewai.tools import BaseTool, EnvVar +from pydantic import BaseModel, Field + + +class AirweaveAdvancedSearchToolSchema(BaseModel): + """Input schema for AirweaveAdvancedSearchTool.""" + + query: str = Field( + ..., + description="The search query to find relevant information" + ) + limit: Optional[int] = Field( + default=10, + ge=1, + le=100, + description="Maximum number of results to return (1-100)" + ) + offset: Optional[int] = Field( + default=0, + ge=0, + description="Number of results to skip for pagination" + ) + response_type: Optional[str] = Field( + default="raw", + description="Response format: 'raw' for search results or 'completion' for AI-generated answer" + ) + source_filter: Optional[str] = Field( + default=None, + description="Filter by specific source name (e.g., 'Stripe', 'GitHub', 'Slack')" + ) + score_threshold: Optional[float] = Field( + default=None, + ge=0.0, + le=1.0, + description="Minimum similarity score threshold (0.0-1.0)" + ) + recency_bias: Optional[float] = Field( + default=0.3, + ge=0.0, + le=1.0, + description="Weight for recent results (0.0=no bias, 1.0=only recency). Default: 0.3" + ) + enable_reranking: Optional[bool] = Field( + default=True, + description="Enable AI reranking for better relevance" + ) + search_method: Optional[str] = Field( + default="hybrid", + description="Search method: 'hybrid' (default), 'neural', or 'keyword'" + ) + + +class AirweaveAdvancedSearchTool(BaseTool): + """ + Advanced search across Airweave collections with filtering and reranking. + + This tool provides advanced search capabilities including: + - Source filtering (search only specific data sources) + - Recency bias (prioritize recent results) + - Score threshold filtering + - AI-powered reranking for improved relevance + - Query expansion for better recall + - Multiple search methods (hybrid, neural, keyword) + + Mirrors the client.collections.search_advanced() method from the Airweave Python SDK. + Use this when you need filtering, reranking, or fine-tuned search control. + """ + + model_config = {"arbitrary_types_allowed": True} + + name: str = "Airweave Advanced Search" + description: str = ( + "Advanced search with filtering and AI enhancements. Use this when you need to: " + "filter by specific sources, prioritize recent results, set minimum relevance scores, " + "enable AI reranking, or use specific search methods (hybrid/neural/keyword)." + ) + args_schema: Type[BaseModel] = AirweaveAdvancedSearchToolSchema + + # Required configuration + collection_id: str = Field( + ..., + description="The readable ID of the Airweave collection to search" + ) + + # Optional configuration + base_url: Optional[str] = Field( + default=None, + description="Custom Airweave API base URL" + ) + max_content_length: int = Field( + default=300, + description="Maximum content length to display per result" + ) + + # Dependencies + package_dependencies: List[str] = ["airweave-sdk"] + env_vars: List[EnvVar] = [ + EnvVar( + name="AIRWEAVE_API_KEY", + description="API key for Airweave", + required=True + ), + ] + + def __init__(self, **kwargs: Any) -> None: + """Initialize the advanced search tool.""" + super().__init__(**kwargs) + + # Lazy import + try: + from airweave import AirweaveSDK + except ImportError: + raise ImportError( + "Missing required package 'airweave-sdk'. Install with:\n" + " pip install airweave-sdk\n" + "or\n" + " pip install 'crewai-tools[airweave]'" + ) + + # Validate API key + api_key = os.getenv("AIRWEAVE_API_KEY") + if not api_key: + raise ValueError( + "AIRWEAVE_API_KEY environment variable is required." + ) + + # Get version safely + try: + from importlib.metadata import version + package_version = version("crewai-tools") + except Exception: + package_version = "unknown" + + # Initialize client + client_kwargs = { + "api_key": api_key, + "framework_name": "crewai", + "framework_version": package_version, + } + if self.base_url: + client_kwargs["base_url"] = self.base_url + + self._client = AirweaveSDK(**client_kwargs) + + def _run( + self, + query: str, + limit: int = 10, + offset: int = 0, + response_type: str = "raw", + source_filter: Optional[str] = None, + score_threshold: Optional[float] = None, + recency_bias: float = 0.3, + enable_reranking: bool = True, + search_method: str = "hybrid", + **kwargs: Any + ) -> str: + """Execute advanced search with filters.""" + try: + # Validate response_type + if response_type not in ["raw", "completion"]: + response_type = "raw" + + # Validate search_method + if search_method not in ["hybrid", "neural", "keyword"]: + search_method = "hybrid" + + # Build filter if source_filter provided + filter_obj = None + if source_filter: + from airweave import FieldCondition, Filter, MatchValue + + filter_obj = Filter( + must=[ + FieldCondition( + key="source_name", + match=MatchValue(value=source_filter) + ) + ] + ) + + # Perform advanced search + response = self._client.collections.search_advanced( + readable_id=self.collection_id, + query=query, + limit=limit, + offset=offset, + score_threshold=score_threshold, + recency_bias=recency_bias, + enable_reranking=enable_reranking, + search_method=search_method, + filter=filter_obj, + response_type=response_type + ) + + # Handle completion response + if response_type == "completion": + if response.completion: + return response.completion + else: + return "Unable to generate an answer from available data. Try rephrasing your question." + + # Handle raw results response + if response.status == "no_results": + return "No results found for your query." + + if response.status == "no_relevant_results": + return "Search completed but no sufficiently relevant results were found. Try adjusting filters or threshold." + + return self._format_results(response.results, limit, source_filter) + + except Exception as e: + return f"Error performing advanced search: {str(e)}" + + async def _arun( + self, + query: str, + limit: int = 10, + offset: int = 0, + response_type: str = "raw", + source_filter: Optional[str] = None, + score_threshold: Optional[float] = None, + recency_bias: float = 0.3, + enable_reranking: bool = True, + search_method: str = "hybrid", + **kwargs: Any + ) -> str: + """Async implementation of advanced search.""" + # Initialize async client if needed + if not hasattr(self, "_async_client"): + from airweave import AsyncAirweaveSDK + + api_key = os.getenv("AIRWEAVE_API_KEY") + + # Get version safely + try: + from importlib.metadata import version + package_version = version("crewai-tools") + except Exception: + package_version = "unknown" + + client_kwargs = { + "api_key": api_key, + "framework_name": "crewai", + "framework_version": package_version, + } + if self.base_url: + client_kwargs["base_url"] = self.base_url + + self._async_client = AsyncAirweaveSDK(**client_kwargs) + + try: + # Validate response_type + if response_type not in ["raw", "completion"]: + response_type = "raw" + + # Validate search_method + if search_method not in ["hybrid", "neural", "keyword"]: + search_method = "hybrid" + + # Build filter + filter_obj = None + if source_filter: + from airweave import FieldCondition, Filter, MatchValue + + filter_obj = Filter( + must=[ + FieldCondition( + key="source_name", + match=MatchValue(value=source_filter) + ) + ] + ) + + # Perform search + response = await self._async_client.collections.search_advanced( + readable_id=self.collection_id, + query=query, + limit=limit, + offset=offset, + score_threshold=score_threshold, + recency_bias=recency_bias, + enable_reranking=enable_reranking, + search_method=search_method, + filter=filter_obj, + response_type=response_type + ) + + # Handle completion response + if response_type == "completion": + if response.completion: + return response.completion + else: + return "Unable to generate an answer from available data." + + # Handle raw results response + if response.status == "no_results": + return "No results found." + + if response.status == "no_relevant_results": + return "Search completed but no sufficiently relevant results found." + + return self._format_results(response.results, limit, source_filter) + + except Exception as e: + return f"Error in async advanced search: {str(e)}" + + def _format_results( + self, + results: List[dict], + limit: int, + source_filter: Optional[str] = None + ) -> str: + """Format advanced search results.""" + if not results: + return "No results found." + + header = f"Found {len(results)} result(s)" + if source_filter: + header += f" from {source_filter}" + header += ":\n" + + formatted = [header] + + for idx, result in enumerate(results[:limit], 1): + payload = result.get("payload", {}) + score = result.get("score", 0.0) + + formatted.append(f"\n--- Result {idx} (Score: {score:.3f}) ---") + + # Content + content = payload.get("md_content", "") + if content: + if len(content) > self.max_content_length: + content = content[:self.max_content_length] + "..." + formatted.append(f"Content: {content}") + + # Metadata + if "source_name" in payload: + formatted.append(f"Source: {payload['source_name']}") + + if "entity_id" in payload: + formatted.append(f"Entity ID: {payload['entity_id']}") + + if "created_at" in payload: + formatted.append(f"Created: {payload['created_at']}") + + if "url" in payload: + formatted.append(f"URL: {payload['url']}") + + return "\n".join(formatted) + diff --git a/crewai_tools/tools/airweave_tool/airweave_search_tool.py b/crewai_tools/tools/airweave_tool/airweave_search_tool.py new file mode 100644 index 00000000..f7c4ec44 --- /dev/null +++ b/crewai_tools/tools/airweave_tool/airweave_search_tool.py @@ -0,0 +1,310 @@ +"""Airweave Search Tool for CrewAI. + +Search across connected data sources (Stripe, GitHub, Notion, Slack, etc.) +using Airweave's unified search API. +""" + +import os +from typing import Any, List, Optional, Type + +from crewai.tools import BaseTool, EnvVar +from pydantic import BaseModel, Field + + +class AirweaveSearchToolSchema(BaseModel): + """Input schema for AirweaveSearchTool.""" + + query: str = Field( + ..., + description="The search query to find relevant information from your connected data sources" + ) + limit: Optional[int] = Field( + default=10, + ge=1, + le=100, + description="Maximum number of results to return (1-100)" + ) + offset: Optional[int] = Field( + default=0, + ge=0, + description="Number of results to skip for pagination" + ) + response_type: Optional[str] = Field( + default="raw", + description="Response format: 'raw' for search results or 'completion' for AI-generated answer" + ) + recency_bias: Optional[float] = Field( + default=0.0, + ge=0.0, + le=1.0, + description="Weight for recent results (0.0=no bias, 1.0=only recency)" + ) + + +class AirweaveSearchTool(BaseTool): + """ + Search across all connected data sources in an Airweave collection. + + This tool enables agents to search through any data source connected to Airweave, + including Stripe, GitHub, Notion, Slack, HubSpot, Zendesk, and 50+ other integrations. + + Mirrors the client.collections.search() method from the Airweave Python SDK. + Use this for straightforward searches. For advanced filtering and reranking, + use AirweaveAdvancedSearchTool. + """ + + model_config = {"arbitrary_types_allowed": True} + + name: str = "Airweave Search" + description: str = ( + "Search across all connected data sources in your Airweave collection. " + "Use this to find information from Stripe, GitHub, Notion, Slack, and other integrated apps. " + "Supports both raw search results and AI-generated answers via response_type parameter." + ) + args_schema: Type[BaseModel] = AirweaveSearchToolSchema + + # Required configuration + collection_id: str = Field( + ..., + description="The readable ID of the Airweave collection to search" + ) + + # Optional configuration + base_url: Optional[str] = Field( + default=None, + description="Custom Airweave API base URL (for self-hosted instances)" + ) + max_content_length: int = Field( + default=300, + description="Maximum content length to display per result" + ) + + # Dependencies + package_dependencies: List[str] = ["airweave-sdk"] + env_vars: List[EnvVar] = [ + EnvVar( + name="AIRWEAVE_API_KEY", + description="API key for Airweave (get from https://app.airweave.ai)", + required=True + ), + ] + + def __init__(self, **kwargs: Any) -> None: + """Initialize the Airweave search tool.""" + super().__init__(**kwargs) + + # Lazy import + try: + from airweave import AirweaveSDK + except ImportError: + raise ImportError( + "Missing required package 'airweave-sdk'. Install with:\n" + " pip install airweave-sdk\n" + "or\n" + " pip install 'crewai-tools[airweave]'" + ) + + # Validate API key + api_key = os.getenv("AIRWEAVE_API_KEY") + if not api_key: + raise ValueError( + "AIRWEAVE_API_KEY environment variable is required. " + "Get your API key from https://app.airweave.ai" + ) + + # Get version safely + try: + from importlib.metadata import version + package_version = version("crewai-tools") + except Exception: + package_version = "unknown" + + # Initialize client + client_kwargs = { + "api_key": api_key, + "framework_name": "crewai", + "framework_version": package_version, + } + if self.base_url: + client_kwargs["base_url"] = self.base_url + + self._client = AirweaveSDK(**client_kwargs) + + def _run( + self, + query: str, + limit: int = 10, + offset: int = 0, + response_type: str = "raw", + recency_bias: float = 0.0, + **kwargs: Any + ) -> str: + """ + Execute search and return results. + + Args: + query: Search query string + limit: Maximum number of results to return + offset: Number of results to skip for pagination + response_type: 'raw' for search results or 'completion' for AI answer + recency_bias: Weight for recent results (0.0-1.0) + + Returns: + Formatted string containing search results or AI-generated answer + """ + try: + # Validate response_type + if response_type not in ["raw", "completion"]: + response_type = "raw" + + response = self._client.collections.search( + readable_id=self.collection_id, + query=query, + limit=limit, + offset=offset, + response_type=response_type, + recency_bias=recency_bias + ) + + # Handle completion response + if response_type == "completion": + if response.completion: + return response.completion + else: + return "Unable to generate an answer from available data. Try rephrasing your question." + + # Handle raw results response + if response.status == "no_results": + return "No results found for your query." + + if response.status == "no_relevant_results": + return "Search completed but no sufficiently relevant results were found. Try rephrasing your query." + + # Format and return results + return self._format_results(response.results, limit) + + except Exception as e: + return f"Error performing search: {str(e)}" + + async def _arun( + self, + query: str, + limit: int = 10, + offset: int = 0, + response_type: str = "raw", + recency_bias: float = 0.0, + **kwargs: Any + ) -> str: + """ + Async implementation using AsyncAirweaveSDK. + + Args: + query: Search query string + limit: Maximum number of results to return + offset: Number of results to skip for pagination + response_type: 'raw' for search results or 'completion' for AI answer + recency_bias: Weight for recent results (0.0-1.0) + + Returns: + Formatted string containing search results or AI-generated answer + """ + # Initialize async client if needed + if not hasattr(self, "_async_client"): + from airweave import AsyncAirweaveSDK + + api_key = os.getenv("AIRWEAVE_API_KEY") + + # Get version safely + try: + from importlib.metadata import version + package_version = version("crewai-tools") + except Exception: + package_version = "unknown" + + client_kwargs = { + "api_key": api_key, + "framework_name": "crewai", + "framework_version": package_version, + } + if self.base_url: + client_kwargs["base_url"] = self.base_url + + self._async_client = AsyncAirweaveSDK(**client_kwargs) + + try: + # Validate response_type + if response_type not in ["raw", "completion"]: + response_type = "raw" + + response = await self._async_client.collections.search( + readable_id=self.collection_id, + query=query, + limit=limit, + offset=offset, + response_type=response_type, + recency_bias=recency_bias + ) + + # Handle completion response + if response_type == "completion": + if response.completion: + return response.completion + else: + return "Unable to generate an answer from available data." + + # Handle raw results response + if response.status == "no_results": + return "No results found for your query." + + if response.status == "no_relevant_results": + return "Search completed but no sufficiently relevant results were found." + + return self._format_results(response.results, limit) + + except Exception as e: + return f"Error performing async search: {str(e)}" + + def _format_results(self, results: List[dict], limit: int) -> str: + """ + Format search results for agent consumption. + + Args: + results: List of search result dictionaries + limit: Maximum number of results to format + + Returns: + Human-readable formatted string + """ + if not results: + return "No results found." + + formatted = [f"Found {len(results)} result(s):\n"] + + for idx, result in enumerate(results[:limit], 1): + payload = result.get("payload", {}) + score = result.get("score", 0.0) + + formatted.append(f"\n--- Result {idx} (Score: {score:.3f}) ---") + + # Content (truncate if too long) + content = payload.get("md_content", "") + if content: + if len(content) > self.max_content_length: + content = content[:self.max_content_length] + "..." + formatted.append(f"Content: {content}") + + # Metadata + if "source_name" in payload: + formatted.append(f"Source: {payload['source_name']}") + + if "entity_id" in payload: + formatted.append(f"Entity ID: {payload['entity_id']}") + + if "created_at" in payload: + formatted.append(f"Created: {payload['created_at']}") + + if "url" in payload: + formatted.append(f"URL: {payload['url']}") + + return "\n".join(formatted) + diff --git a/pyproject.toml b/pyproject.toml index a4e2a259..d101b02f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,6 +97,9 @@ qdrant-client = [ apify = [ "langchain-apify>=0.1.2,<1.0.0", ] +airweave = [ + "airweave-sdk>=0.1.50", +] databricks-sdk = [ "databricks-sdk>=0.46.0", diff --git a/tests/tools/airweave_tool_test.py b/tests/tools/airweave_tool_test.py new file mode 100644 index 00000000..a0c69fe0 --- /dev/null +++ b/tests/tools/airweave_tool_test.py @@ -0,0 +1,332 @@ +"""Unit tests for Airweave tools.""" + +import os +from unittest.mock import Mock, patch + +import pytest + +from crewai_tools.tools.airweave_tool import ( + AirweaveAdvancedSearchTool, + AirweaveSearchTool, +) + + +@pytest.fixture +def mock_env(monkeypatch): + """Set up environment variables.""" + monkeypatch.setenv("AIRWEAVE_API_KEY", "test_api_key_12345") + + +@pytest.fixture +def mock_search_response(): + """Create mock search response with raw results.""" + return Mock( + status="success", + results=[ + { + "score": 0.95, + "payload": { + "md_content": "Test content from Stripe about a customer payment", + "source_name": "Stripe", + "entity_id": "cus_123", + "created_at": "2024-01-15T10:00:00Z", + "url": "https://stripe.com/customers/cus_123" + } + }, + { + "score": 0.87, + "payload": { + "md_content": "GitHub issue about payment integration bug", + "source_name": "GitHub", + "entity_id": "issue_456", + "created_at": "2024-01-14T15:30:00Z" + } + } + ], + response_type="raw", + completion=None + ) + + +@pytest.fixture +def mock_completion_response(): + """Create mock search response with completion.""" + return Mock( + status="success", + results=[], + response_type="completion", + completion="Based on the data from Stripe and GitHub, there were 3 failed payments in the last month due to expired cards." + ) + + +@pytest.fixture +def mock_no_results_response(): + """Create mock response with no results.""" + return Mock( + status="no_results", + results=[], + response_type="raw", + completion=None + ) + + +class TestAirweaveSearchTool: + """Tests for AirweaveSearchTool.""" + + def test_requires_api_key(self, monkeypatch): + """Test that tool requires API key.""" + monkeypatch.delenv("AIRWEAVE_API_KEY", raising=False) + with pytest.raises(ValueError, match="AIRWEAVE_API_KEY"): + AirweaveSearchTool(collection_id="test-collection") + + def test_initialization_with_valid_api_key(self, mock_env): + """Test successful initialization with API key.""" + with patch("crewai_tools.tools.airweave_tool.airweave_search_tool.AirweaveSDK"): + tool = AirweaveSearchTool(collection_id="test-collection") + assert tool.collection_id == "test-collection" + assert tool.name == "Airweave Search" + + def test_basic_search_raw_results(self, mock_env, mock_search_response): + """Test basic search with raw results.""" + with patch("crewai_tools.tools.airweave_tool.airweave_search_tool.AirweaveSDK") as MockSDK: + mock_client = Mock() + mock_client.collections.search.return_value = mock_search_response + MockSDK.return_value = mock_client + + tool = AirweaveSearchTool(collection_id="test-collection") + result = tool.run(query="find failed payments", limit=5) + + # Verify API call + mock_client.collections.search.assert_called_once_with( + readable_id="test-collection", + query="find failed payments", + limit=5, + response_type="raw", + recency_bias=None + ) + + # Verify result format + assert "Found 2 result" in result + assert "Test content from Stripe" in result + assert "Stripe" in result + assert "0.950" in result + assert "GitHub issue" in result + + def test_search_with_completion(self, mock_env, mock_completion_response): + """Test search requesting AI-generated completion.""" + with patch("crewai_tools.tools.airweave_tool.airweave_search_tool.AirweaveSDK") as MockSDK: + mock_client = Mock() + mock_client.collections.search.return_value = mock_completion_response + MockSDK.return_value = mock_client + + tool = AirweaveSearchTool(collection_id="test-collection") + result = tool.run(query="what are the payment issues?", response_type="completion") + + # Verify API call + mock_client.collections.search.assert_called_once_with( + readable_id="test-collection", + query="what are the payment issues?", + limit=10, + response_type="completion", + recency_bias=None + ) + + # Verify completion response + assert "Based on the data from Stripe and GitHub" in result + assert "3 failed payments" in result + + def test_no_results_handling(self, mock_env, mock_no_results_response): + """Test handling of no results.""" + with patch("crewai_tools.tools.airweave_tool.airweave_search_tool.AirweaveSDK") as MockSDK: + mock_client = Mock() + mock_client.collections.search.return_value = mock_no_results_response + MockSDK.return_value = mock_client + + tool = AirweaveSearchTool(collection_id="test-collection") + result = tool.run(query="nonexistent query") + + assert "No results found" in result + + def test_no_relevant_results_handling(self, mock_env): + """Test handling of no relevant results.""" + mock_response = Mock( + status="no_relevant_results", + results=[], + response_type="raw", + completion=None + ) + + with patch("crewai_tools.tools.airweave_tool.airweave_search_tool.AirweaveSDK") as MockSDK: + mock_client = Mock() + mock_client.collections.search.return_value = mock_response + MockSDK.return_value = mock_client + + tool = AirweaveSearchTool(collection_id="test-collection") + result = tool.run(query="vague query") + + assert "no sufficiently relevant results" in result + + def test_error_handling(self, mock_env): + """Test API error handling.""" + with patch("crewai_tools.tools.airweave_tool.airweave_search_tool.AirweaveSDK") as MockSDK: + mock_client = Mock() + mock_client.collections.search.side_effect = Exception("API Error: Collection not found") + MockSDK.return_value = mock_client + + tool = AirweaveSearchTool(collection_id="test-collection") + result = tool.run(query="test query") + + assert "Error performing search" in result + assert "API Error" in result + + def test_custom_base_url(self, mock_env): + """Test initialization with custom base URL.""" + with patch("crewai_tools.tools.airweave_tool.airweave_search_tool.AirweaveSDK") as MockSDK: + tool = AirweaveSearchTool( + collection_id="test-collection", + base_url="http://localhost:8001" + ) + + # Verify SDK initialized with custom URL + MockSDK.assert_called_once() + call_kwargs = MockSDK.call_args[1] + assert call_kwargs["base_url"] == "http://localhost:8001" + + def test_recency_bias(self, mock_env, mock_search_response): + """Test search with recency bias.""" + with patch("crewai_tools.tools.airweave_tool.airweave_search_tool.AirweaveSDK") as MockSDK: + mock_client = Mock() + mock_client.collections.search.return_value = mock_search_response + MockSDK.return_value = mock_client + + tool = AirweaveSearchTool(collection_id="test-collection") + tool.run(query="recent issues", recency_bias=0.8) + + # Verify recency_bias passed correctly + call_kwargs = mock_client.collections.search.call_args[1] + assert call_kwargs["recency_bias"] == 0.8 + + +class TestAirweaveAdvancedSearchTool: + """Tests for AirweaveAdvancedSearchTool.""" + + def test_requires_api_key(self, monkeypatch): + """Test that tool requires API key.""" + monkeypatch.delenv("AIRWEAVE_API_KEY", raising=False) + with pytest.raises(ValueError, match="AIRWEAVE_API_KEY"): + AirweaveAdvancedSearchTool(collection_id="test-collection") + + def test_initialization(self, mock_env): + """Test successful initialization.""" + with patch("crewai_tools.tools.airweave_tool.airweave_advanced_search_tool.AirweaveSDK"): + tool = AirweaveAdvancedSearchTool(collection_id="test-collection") + assert tool.collection_id == "test-collection" + assert tool.name == "Airweave Advanced Search" + + def test_advanced_search_with_source_filter(self, mock_env, mock_search_response): + """Test advanced search with source filtering.""" + with patch("crewai_tools.tools.airweave_tool.airweave_advanced_search_tool.AirweaveSDK") as MockSDK: + mock_client = Mock() + mock_client.collections.search_advanced.return_value = mock_search_response + MockSDK.return_value = mock_client + + tool = AirweaveAdvancedSearchTool(collection_id="test-collection") + result = tool.run( + query="payment issues", + source_filter="Stripe", + limit=10, + enable_reranking=True + ) + + # Verify API call with filter + mock_client.collections.search_advanced.assert_called_once() + call_kwargs = mock_client.collections.search_advanced.call_args[1] + assert call_kwargs["query"] == "payment issues" + assert call_kwargs["limit"] == 10 + assert call_kwargs["enable_reranking"] is True + assert call_kwargs["filter"] is not None # Filter object created + + # Verify result includes source info + assert "from Stripe" in result + + def test_advanced_search_with_score_threshold(self, mock_env, mock_search_response): + """Test advanced search with score threshold.""" + with patch("crewai_tools.tools.airweave_tool.airweave_advanced_search_tool.AirweaveSDK") as MockSDK: + mock_client = Mock() + mock_client.collections.search_advanced.return_value = mock_search_response + MockSDK.return_value = mock_client + + tool = AirweaveAdvancedSearchTool(collection_id="test-collection") + tool.run(query="test", score_threshold=0.8) + + call_kwargs = mock_client.collections.search_advanced.call_args[1] + assert call_kwargs["score_threshold"] == 0.8 + + def test_advanced_search_with_search_method(self, mock_env, mock_search_response): + """Test advanced search with different search methods.""" + with patch("crewai_tools.tools.airweave_tool.airweave_advanced_search_tool.AirweaveSDK") as MockSDK: + mock_client = Mock() + mock_client.collections.search_advanced.return_value = mock_search_response + MockSDK.return_value = mock_client + + tool = AirweaveAdvancedSearchTool(collection_id="test-collection") + + # Test neural search + tool.run(query="test", search_method="neural") + call_kwargs = mock_client.collections.search_advanced.call_args[1] + assert call_kwargs["search_method"] == "neural" + + def test_advanced_search_completion_mode(self, mock_env, mock_completion_response): + """Test advanced search with completion response type.""" + with patch("crewai_tools.tools.airweave_tool.airweave_advanced_search_tool.AirweaveSDK") as MockSDK: + mock_client = Mock() + mock_client.collections.search_advanced.return_value = mock_completion_response + MockSDK.return_value = mock_client + + tool = AirweaveAdvancedSearchTool(collection_id="test-collection") + result = tool.run( + query="summarize payment issues", + response_type="completion", + source_filter="Stripe" + ) + + assert "Based on the data" in result + assert "3 failed payments" in result + + def test_advanced_search_no_results(self, mock_env, mock_no_results_response): + """Test advanced search with no results.""" + with patch("crewai_tools.tools.airweave_tool.airweave_advanced_search_tool.AirweaveSDK") as MockSDK: + mock_client = Mock() + mock_client.collections.search_advanced.return_value = mock_no_results_response + MockSDK.return_value = mock_client + + tool = AirweaveAdvancedSearchTool(collection_id="test-collection") + result = tool.run(query="nonexistent", score_threshold=0.99) + + assert "No results found" in result + + def test_advanced_search_error_handling(self, mock_env): + """Test advanced search error handling.""" + with patch("crewai_tools.tools.airweave_tool.airweave_advanced_search_tool.AirweaveSDK") as MockSDK: + mock_client = Mock() + mock_client.collections.search_advanced.side_effect = Exception("Filter error") + MockSDK.return_value = mock_client + + tool = AirweaveAdvancedSearchTool(collection_id="test-collection") + result = tool.run(query="test") + + assert "Error performing advanced search" in result + + def test_recency_bias_default(self, mock_env, mock_search_response): + """Test that advanced search has default recency bias of 0.3.""" + with patch("crewai_tools.tools.airweave_tool.airweave_advanced_search_tool.AirweaveSDK") as MockSDK: + mock_client = Mock() + mock_client.collections.search_advanced.return_value = mock_search_response + MockSDK.return_value = mock_client + + tool = AirweaveAdvancedSearchTool(collection_id="test-collection") + tool.run(query="test") + + call_kwargs = mock_client.collections.search_advanced.call_args[1] + assert call_kwargs["recency_bias"] == 0.3 +