From 6f1131339b17f9229df254bd04c4d164c0108f5b Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Tue, 23 Dec 2025 17:58:03 +0530 Subject: [PATCH 1/7] message_processor function as tool --- .../functions/CreateFunctionDialog.tsx | 1 + .../server/apps/floware/floware/server.py | 12 ++ .../workflow_job/workflow_job/main.py | 31 +++- .../agents_module/agents_container.py | 9 + .../services/agent_crud_service.py | 104 +++++++++++- .../services/agent_inference_service.py | 101 +++++++++++- .../services/message_processor_service.py | 2 +- .../tools_module/available_tools.json | 10 ++ .../message_processor/provider.py | 156 ++++++++++++++++++ .../registry/function_registry.py | 4 + .../registries/message_processor_registry.py | 16 ++ .../tools_module/registry/tool_loader.py | 5 +- .../tools_module/tools_container.py | 12 ++ .../utils/message_processor_fn.py | 16 +- 14 files changed, 454 insertions(+), 25 deletions(-) create mode 100644 wavefront/server/modules/tools_module/tools_module/message_processor/provider.py create mode 100644 wavefront/server/modules/tools_module/tools_module/registry/registries/message_processor_registry.py diff --git a/wavefront/client/src/pages/apps/[appId]/functions/CreateFunctionDialog.tsx b/wavefront/client/src/pages/apps/[appId]/functions/CreateFunctionDialog.tsx index aedf38a1..ce484747 100644 --- a/wavefront/client/src/pages/apps/[appId]/functions/CreateFunctionDialog.tsx +++ b/wavefront/client/src/pages/apps/[appId]/functions/CreateFunctionDialog.tsx @@ -35,6 +35,7 @@ const createMessageProcessorSchema = z.object({ type CreateMessageProcessorInput = z.infer; const defaultYamlContent = `type: javascript +description: double the number function: code: | export default function(input) { diff --git a/wavefront/server/apps/floware/floware/server.py b/wavefront/server/apps/floware/floware/server.py index 5ff8a377..caa4f38c 100644 --- a/wavefront/server/apps/floware/floware/server.py +++ b/wavefront/server/apps/floware/floware/server.py @@ -143,10 +143,20 @@ product_analysis_container = ProductAnalysisContainer() +cloud_provider = os.getenv('CLOUD_PROVIDER', 'aws') +bucket_name = ( + os.getenv('AWS_GOLD_ASSET_BUCKET_NAME') + if cloud_provider == 'aws' + else os.getenv('GCP_ASSET_STORAGE_BUCKET') +) + tools_container = ToolsContainer( datasource_repository=db_repo_container.datasource_repository, knowledge_base_repository=db_repo_container.knowledge_base_repository, knowledge_base_inference_repository=db_repo_container.knowledge_base_inference_repository, + message_processor_repository=plugins_container.message_processor_repository, + cloud_manager=common_container.cloud_storage_manager, + message_processor_bucket_name=bucket_name, ) agents_container = AgentsContainer( @@ -159,6 +169,8 @@ namespace_repository=db_repo_container.namespace_repository, agent_repository=db_repo_container.agent_repository, workflow_repository=db_repo_container.workflow_repository, + message_processor_repository=plugins_container.message_processor_repository, + message_processor_bucket_name=bucket_name, ) inference_container = InferenceContainer( diff --git a/wavefront/server/background_jobs/workflow_job/workflow_job/main.py b/wavefront/server/background_jobs/workflow_job/workflow_job/main.py index 2f7afa15..2bbf20b6 100644 --- a/wavefront/server/background_jobs/workflow_job/workflow_job/main.py +++ b/wavefront/server/background_jobs/workflow_job/workflow_job/main.py @@ -31,7 +31,28 @@ response_formatter=common_container.response_formatter, ) -tools_container = ToolsContainer() +plugins_container = PluginsContainer( + db_client=db_repo_container.db_client, + cloud_manager=common_container.cloud_storage_manager, + dynamic_query_repository=db_repo_container.dynamic_query_repository, + cache_manager=db_repo_container.cache_manager, +) + +cloud_provider = config['cloud_config']['cloud_provider'] +bucket_name = ( + config['aws']['aws_asset_storage_bucket'] + if cloud_provider == 'aws' + else config['gcp']['gcp_asset_storage_bucket'] +) + +tools_container = ToolsContainer( + datasource_repository=db_repo_container.datasource_repository, + knowledge_base_repository=db_repo_container.knowledge_base_repository, + knowledge_base_inference_repository=db_repo_container.knowledge_base_inference_repository, + message_processor_repository=plugins_container.message_processor_repository, + cloud_manager=common_container.cloud_storage_manager, + bucket_name=bucket_name, +) agents_container = AgentsContainer( db_client=db_repo_container.db_client, @@ -43,13 +64,7 @@ namespace_repository=db_repo_container.namespace_repository, agent_repository=db_repo_container.agent_repository, workflow_repository=db_repo_container.workflow_repository, -) - -plugins_container = PluginsContainer( - db_client=db_repo_container.db_client, - cloud_manager=common_container.cloud_storage_manager, - dynamic_query_repository=db_repo_container.dynamic_query_repository, - cache_manager=db_repo_container.cache_manager, + message_processor_repository=plugins_container.message_processor_repository, ) common_container.wire( diff --git a/wavefront/server/modules/agents_module/agents_module/agents_container.py b/wavefront/server/modules/agents_module/agents_module/agents_container.py index eccfce3c..6e3269ce 100644 --- a/wavefront/server/modules/agents_module/agents_module/agents_container.py +++ b/wavefront/server/modules/agents_module/agents_module/agents_container.py @@ -28,6 +28,10 @@ class AgentsContainer(containers.DeclarativeContainer): workflow_repository = providers.Dependency() + message_processor_repository = providers.Dependency() + + message_processor_bucket_name = providers.Dependency() + namespace_service = providers.Singleton( NamespaceService, namespace_repository=namespace_repository, @@ -41,6 +45,8 @@ class AgentsContainer(containers.DeclarativeContainer): cloud_storage_manager=cloud_storage_manager, cache_manager=cache_manager, bucket_name=config.agents.agent_yaml_bucket, + message_processor_repository=message_processor_repository, + message_processor_bucket_name=message_processor_bucket_name, ) # Agent inference service @@ -49,6 +55,9 @@ class AgentsContainer(containers.DeclarativeContainer): cache_manager=cache_manager, tool_loader=tool_loader, agent_crud_service=agent_crud_service, + message_processor_repository=message_processor_repository, + cloud_storage_manager=cloud_storage_manager, + message_processor_bucket_name=message_processor_bucket_name, ) workflow_crud_service = providers.Singleton( diff --git a/wavefront/server/modules/agents_module/agents_module/services/agent_crud_service.py b/wavefront/server/modules/agents_module/agents_module/services/agent_crud_service.py index 3324edda..0521aa2a 100644 --- a/wavefront/server/modules/agents_module/agents_module/services/agent_crud_service.py +++ b/wavefront/server/modules/agents_module/agents_module/services/agent_crud_service.py @@ -5,6 +5,7 @@ from db_repo_module.cache.cache_manager import CacheManager from db_repo_module.models.agent import Agent +from db_repo_module.models.message_processors import MessageProcessors from db_repo_module.repositories.sql_alchemy_repository import SQLAlchemyRepository from flo_cloud.cloud_storage import CloudStorageManager from flo_cloud.exceptions import CloudStorageFileNotFoundError @@ -31,6 +32,8 @@ def __init__( cloud_storage_manager: CloudStorageManager, cache_manager: CacheManager, bucket_name: str, + message_processor_repository: SQLAlchemyRepository[MessageProcessors], + message_processor_bucket_name: str, ): """ Initialize the agent CRUD service @@ -41,15 +44,19 @@ def __init__( cloud_storage_manager: Cloud storage manager instance cache_manager: Cache manager instance bucket_name: Name of the bucket containing agent YAML files + message_processor_repository: Repository for message processors + message_processor_bucket_name: Name of the bucket containing message processor YAML files """ self.agent_repository = agent_repository self.namespace_service = namespace_service self.cloud_storage_manager = cloud_storage_manager self.cache_manager = cache_manager self.bucket_name = bucket_name + self.message_processor_repository = message_processor_repository + self.message_processor_bucket_name = message_processor_bucket_name self.cache_ttl = 3600 # 1 hour for agents - def _validate_yaml_content( + async def _validate_yaml_content( self, yaml_content: str, namespace: str, @@ -78,12 +85,29 @@ def _validate_yaml_content( for tool in yaml_tools: tool_name = tool.get('name', None) if tool_name: - # Find the corresponding Tool object from tool_available list + # First, try to find in tool_available list + tool_found = False for tool_obj in tool_available: if tool_obj.name == tool_name: tool_registry[tool_name] = tool_obj + tool_found = True break + # If not found, check if it's a message processor + if not tool_found: + tool_obj = await self._try_load_message_processor_tool( + tool_name + ) + if tool_obj: + tool_registry[tool_name] = tool_obj + tool_found = True + + # If still not found, log warning (AgentBuilder will fail with better error) + if not tool_found: + logger.warning( + f'Tool {tool_name} not found in available tools or message processors' + ) + AgentBuilder.from_yaml( yaml_str=yaml_content, tool_registry=tool_registry, @@ -99,6 +123,78 @@ def _validate_yaml_content( ) raise ValueError(f'Invalid agent YAML configuration: {str(e)}') + async def _try_load_message_processor_tool(self, tool_name: str) -> Optional[Tool]: + """ + Attempt to load a message processor as a Tool object. + + Args: + tool_name: Name of the tool (should match message processor name) + + Returns: + Tool object if message processor found, None otherwise + """ + from tools_module.utils.message_processor_fn import execute_message_processor_fn + + try: + # Query message processor by name + processor = await self.message_processor_repository.find_one(name=tool_name) + + if not processor: + return None + + # Load YAML to get input_schema + yaml_key = f'message_processors/v1/{processor.source}' + try: + yaml_bytes = self.cloud_storage_manager.read_file( + self.message_processor_bucket_name, yaml_key + ) + yaml_content = yaml_bytes.decode('utf-8') + yaml_dict = yaml.safe_load(yaml_content) + + # Extract parameters from input_schema + input_schema = yaml_dict.get('input_schema', {}) + properties = input_schema.get('properties', {}) + + # Build parameters dict for Tool + parameters = { + 'message_processor_id': { + 'type': 'string', + 'description': 'UUID of the message processor', + } + } + + for param_name, param_spec in properties.items(): + parameters[param_name] = { + 'type': param_spec.get('type', 'string'), + 'description': param_spec.get('description', ''), + } + + # Create Tool object + description = yaml_dict.get( + 'description', + processor.description or 'Message processor function', + ) + + tool = Tool( + name=tool_name, + description=description, + function=execute_message_processor_fn, + parameters=parameters, + ) + + logger.info(f'Dynamically loaded message processor tool: {tool_name}') + return tool + + except Exception as e: + logger.warning( + f'Failed to load YAML for message processor {tool_name}: {str(e)}' + ) + return None + + except Exception as e: + logger.debug(f'Message processor {tool_name} not found: {str(e)}') + return None + async def create_agent( self, name: str, @@ -129,7 +225,7 @@ async def create_agent( validate_agent_workflow_name(name, type='agent') # Validate YAML content before proceeding - self._validate_yaml_content( + await self._validate_yaml_content( yaml_content, namespace, name, tool_available, access_token, app_key ) @@ -338,7 +434,7 @@ async def update_agent( raise ValueError(f'Agent not found with ID: {agent_id}') # Validate YAML content - self._validate_yaml_content( + await self._validate_yaml_content( yaml_content, agent.namespace, agent.name, diff --git a/wavefront/server/modules/agents_module/agents_module/services/agent_inference_service.py b/wavefront/server/modules/agents_module/agents_module/services/agent_inference_service.py index 4f5c56a5..4043f062 100644 --- a/wavefront/server/modules/agents_module/agents_module/services/agent_inference_service.py +++ b/wavefront/server/modules/agents_module/agents_module/services/agent_inference_service.py @@ -5,10 +5,15 @@ from agents_module.services.agent_crud_service import AgentCrudService from db_repo_module.cache.cache_manager import CacheManager from db_repo_module.models.llm_inference_config import LlmInferenceConfig +from db_repo_module.models.message_processors import MessageProcessors +from db_repo_module.repositories.sql_alchemy_repository import SQLAlchemyRepository from flo_ai import AgentBuilder, Agent, BaseMessage, FloUtils from flo_ai.llm import OpenAI, Anthropic, Gemini, OllamaLLM, OpenAIVLLM +from flo_ai.tool.base_tool import Tool +from flo_cloud.cloud_storage import CloudStorageManager from common_module.log.logger import logger from tools_module.registry.tool_loader import ToolLoader +from tools_module.utils.message_processor_fn import execute_message_processor_fn import yaml @@ -20,6 +25,9 @@ def __init__( cache_manager: CacheManager, tool_loader: ToolLoader, agent_crud_service: AgentCrudService, + message_processor_repository: SQLAlchemyRepository[MessageProcessors], + cloud_storage_manager: CloudStorageManager, + message_processor_bucket_name: str, ): """ Initialize the agent inference service @@ -28,10 +36,16 @@ def __init__( cache_manager: Cache manager instance tool_loader: Tool loader instance agent_crud_service: Agent CRUD service for fetching agent YAML + message_processor_repository: Repository for message processors + cloud_storage_manager: Cloud storage manager instance + message_processor_bucket_name: Name of the bucket containing message processor YAML files """ self.cache_manager = cache_manager self.tool_loader = tool_loader self.agent_crud_service = agent_crud_service + self.message_processor_repository = message_processor_repository + self.cloud_storage_manager = cloud_storage_manager + self.message_processor_bucket_name = message_processor_bucket_name async def create_agent_from_yaml( self, @@ -61,8 +75,20 @@ async def create_agent_from_yaml( if tool_names: logger.info(f'Loading tools for agent {agent_name}: {tool_names}') for tool in tool_names: - tools = self.tool_loader.load_tool_with_name(tool.get('name')) - tool_register[tool.get('name')] = tools + tool_name = tool.get('name') + # First try to load from static registry + tools = self.tool_loader.load_tool_with_name(tool_name) + + # If not found, try loading as message processor + if tools is None: + tools = await self._try_load_message_processor_tool(tool_name) + + if tools: + tool_register[tool_name] = tools + else: + logger.warning( + f'Tool {tool_name} not found in registry or message processors' + ) else: logger.warning(f'No tools were loaded for agent {agent_name}') @@ -85,6 +111,77 @@ async def create_agent_from_yaml( logger.info(f'Successfully created agent for agent: {agent_name}') return agent + async def _try_load_message_processor_tool(self, tool_name: str) -> Optional[Tool]: + """ + Attempt to load a message processor as a Tool object. + + Args: + tool_name: Name of the tool (should match message processor name) + + Returns: + Tool object if message processor found, None otherwise + """ + + try: + # Query message processor by name + processor = await self.message_processor_repository.find_one(name=tool_name) + + if not processor: + return None + + # Load YAML to get input_schema + yaml_key = f'message_processors/v1/{processor.source}' + try: + yaml_bytes = self.cloud_storage_manager.read_file( + self.message_processor_bucket_name, yaml_key + ) + yaml_content = yaml_bytes.decode('utf-8') + yaml_dict = yaml.safe_load(yaml_content) + + # Extract parameters from input_schema + input_schema = yaml_dict.get('input_schema', {}) + properties = input_schema.get('properties', {}) + + # Build parameters dict for Tool + parameters = { + 'message_processor_id': { + 'type': 'string', + 'description': 'UUID of the message processor', + } + } + + for param_name, param_spec in properties.items(): + parameters[param_name] = { + 'type': param_spec.get('type', 'string'), + 'description': param_spec.get('description', ''), + } + + # Create Tool object + description = yaml_dict.get( + 'description', + processor.description or 'Message processor function', + ) + + tool = Tool( + name=tool_name, + description=description, + function=execute_message_processor_fn, + parameters=parameters, + ) + + logger.info(f'Dynamically loaded message processor tool: {tool_name}') + return tool + + except Exception as e: + logger.warning( + f'Failed to load YAML for message processor {tool_name}: {str(e)}' + ) + return None + + except Exception as e: + logger.debug(f'Message processor {tool_name} not found: {str(e)}') + return None + def _create_llm_instance(self, config: LlmInferenceConfig): """ Create LLM instance based on configuration diff --git a/wavefront/server/modules/plugins_module/plugins_module/services/message_processor_service.py b/wavefront/server/modules/plugins_module/plugins_module/services/message_processor_service.py index 3c119e2f..78dc3ae9 100644 --- a/wavefront/server/modules/plugins_module/plugins_module/services/message_processor_service.py +++ b/wavefront/server/modules/plugins_module/plugins_module/services/message_processor_service.py @@ -115,7 +115,7 @@ async def save_message_processor_yaml( if not yaml_dict: raise ValueError('YAML content is empty or invalid') - required_fields = ['function', 'input_schema', 'type'] + required_fields = ['function', 'input_schema', 'type', 'description'] missing_fields = [field for field in required_fields if field not in yaml_dict] if missing_fields: raise ValueError(f'YAML must contain required fields: {missing_fields}') diff --git a/wavefront/server/modules/tools_module/tools_module/available_tools.json b/wavefront/server/modules/tools_module/tools_module/available_tools.json index d28cdeec..0fe8e069 100644 --- a/wavefront/server/modules/tools_module/tools_module/available_tools.json +++ b/wavefront/server/modules/tools_module/tools_module/available_tools.json @@ -157,5 +157,15 @@ "email_body" ], "category": "email" + }, + "trigger_message_processor": { + "name": "trigger_message_processor", + "description": "Execute a message processor function. This template gets expanded into individual tools for each message processor.", + "parameters": {}, + "prefill_values": [ + "message_processor_id" + ], + "required": [], + "category": "message_processor" } } diff --git a/wavefront/server/modules/tools_module/tools_module/message_processor/provider.py b/wavefront/server/modules/tools_module/tools_module/message_processor/provider.py new file mode 100644 index 00000000..e301518d --- /dev/null +++ b/wavefront/server/modules/tools_module/tools_module/message_processor/provider.py @@ -0,0 +1,156 @@ +from typing import List, Dict, Any +import yaml +from tools_module.interfaces.tool_details_provider import ToolDetailsProvider +from db_repo_module.repositories.sql_alchemy_repository import SQLAlchemyRepository +from db_repo_module.models.message_processors import MessageProcessors +from tools_module.models.tool_schemas import ToolExecutionDetails +from flo_cloud.cloud_storage import CloudStorageManager +from common_module.log.logger import logger + + +class MessageProcessorToolDetailsProvider(ToolDetailsProvider): + """Provider for expanding message processor tools from database + YAML definitions""" + + def __init__( + self, + message_processor_repository: SQLAlchemyRepository[MessageProcessors], + cloud_manager: CloudStorageManager, + message_processor_bucket_name: str, + ): + self.message_processor_repository = message_processor_repository + self.cloud_manager = cloud_manager + self.message_processor_bucket_name = message_processor_bucket_name + self.prefix = 'message_processors/v1' + + def can_handle(self, category: str) -> bool: + return category == 'message_processor' + + async def get_tool_details( + self, tool_metadata: Dict[str, Any] + ) -> List[ToolExecutionDetails]: + """ + Expand the trigger_message_processor template into individual tools. + + For each message processor in the database: + 1. Fetch the processor metadata + 2. Load YAML from cloud storage + 3. Parse input_schema to get parameters + 4. Create a ToolExecutionDetails with processor-specific params + """ + tool_details = [] + + # Fetch all message processors from database + all_processors = await self.message_processor_repository.find() + + for processor in all_processors: + try: + # Load YAML content from cloud storage + yaml_content = await self._load_yaml_content(processor) + + # Parse YAML to extract schema + yaml_dict = yaml.safe_load(yaml_content) + + # Validate YAML structure + if not self._validate_yaml_structure(yaml_dict): + # Skip processors with invalid YAML + logger.warning( + f'Invalid YAML structure for message processor {processor.name}, skipping' + ) + continue + + # Extract parameters from input_schema + input_schema = yaml_dict.get('input_schema', {}) + parameters = self._convert_schema_to_parameters(input_schema) + required = input_schema.get('required', []) + + # Use description from YAML if available, otherwise from processor + description = yaml_dict.get( + 'description', processor.description or 'Message processor function' + ) + + # Create tool details with processor-specific parameters + tool_details.append( + ToolExecutionDetails( + name=processor.name, # Each processor becomes its own tool + resource_name=processor.name, + prefill_parameter_names=['message_processor_id'], + prefilled_value={ + 'message_processor_id': str(processor.id), + }, + required=required, + parameters=parameters, + description=description, + category='message_processor', + ) + ) + except Exception as e: + # Log error but continue processing other processors + logger.warning( + f'Error loading message processor {processor.name}: {str(e)}, skipping' + ) + continue + + return tool_details + + async def _load_yaml_content(self, processor: MessageProcessors) -> str: + """Load YAML content from cloud storage""" + filepath = f'{self.prefix}/{processor.source}' + yaml_bytes = self.cloud_manager.read_file( + self.message_processor_bucket_name, filepath + ) + return yaml_bytes.decode('utf-8') + + def _validate_yaml_structure(self, yaml_dict: Dict[str, Any]) -> bool: + """Validate that YAML has required fields for tool definition""" + required_fields = ['function', 'input_schema', 'type'] + if not all(field in yaml_dict for field in required_fields): + return False + + input_schema = yaml_dict.get('input_schema', {}) + if 'properties' not in input_schema: + return False + + return True + + def _convert_schema_to_parameters( + self, input_schema: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Convert YAML input_schema to tool parameters format. + + YAML format: + properties: + number: + type: number + description: The number to process + + Tool format: + { + "number": { + "type": "number", + "description": "The number to process" + } + } + """ + properties = input_schema.get('properties', {}) + + # Add message_processor_id to parameters (it's prefilled) + parameters = { + 'message_processor_id': { + 'type': 'string', + 'description': 'UUID of the message processor (automatically filled)', + } + } + + # Add all properties from YAML input_schema + for param_name, param_spec in properties.items(): + parameters[param_name] = { + 'type': param_spec.get('type', 'string'), + 'description': param_spec.get('description', ''), + } + + # Handle additional schema properties if needed + if 'items' in param_spec: + parameters[param_name]['items'] = param_spec['items'] + + return parameters diff --git a/wavefront/server/modules/tools_module/tools_module/registry/function_registry.py b/wavefront/server/modules/tools_module/tools_module/registry/function_registry.py index f89b7f42..4a7d776e 100644 --- a/wavefront/server/modules/tools_module/tools_module/registry/function_registry.py +++ b/wavefront/server/modules/tools_module/tools_module/registry/function_registry.py @@ -13,6 +13,9 @@ from tools_module.registry.registries.util_function_registry import ( UTIL_FUNCTION_REGISTRY, ) +from tools_module.registry.registries.message_processor_registry import ( + MESSAGE_PROCESSOR_REGISTRY, +) # TODO: Import other category registries as they are implemented @@ -37,6 +40,7 @@ def _merge_registries(*registries): KNOWLEDGE_BASE_REGISTRY, EMAIL_REGISTRY, UTIL_FUNCTION_REGISTRY, + MESSAGE_PROCESSOR_REGISTRY, ) diff --git a/wavefront/server/modules/tools_module/tools_module/registry/registries/message_processor_registry.py b/wavefront/server/modules/tools_module/tools_module/registry/registries/message_processor_registry.py new file mode 100644 index 00000000..518dea3d --- /dev/null +++ b/wavefront/server/modules/tools_module/tools_module/registry/registries/message_processor_registry.py @@ -0,0 +1,16 @@ +""" +Message Processor Tools Registry + +Contains the mapping from message processor tool names to their execution function. +Since all message processors use the same execution function (with different IDs), +this registry uses a single trigger function. +""" + +from tools_module.utils.message_processor_fn import execute_message_processor_fn + + +# For message processors, we use a single execution function +# The actual processor is selected via the message_processor_id parameter +MESSAGE_PROCESSOR_REGISTRY = { + 'trigger_message_processor': execute_message_processor_fn, +} diff --git a/wavefront/server/modules/tools_module/tools_module/registry/tool_loader.py b/wavefront/server/modules/tools_module/tools_module/registry/tool_loader.py index e6dc0dd7..d4f88740 100644 --- a/wavefront/server/modules/tools_module/tools_module/registry/tool_loader.py +++ b/wavefront/server/modules/tools_module/tools_module/registry/tool_loader.py @@ -8,7 +8,10 @@ class ToolLoader: """Handles loading and management of tools from the registry""" - def __init__(self, tools_json_path: Optional[str] = None): + def __init__( + self, + tools_json_path: Optional[str] = None, + ): """ Initialize tool loader diff --git a/wavefront/server/modules/tools_module/tools_module/tools_container.py b/wavefront/server/modules/tools_module/tools_module/tools_container.py index fc34f76a..abf8f4ac 100644 --- a/wavefront/server/modules/tools_module/tools_module/tools_container.py +++ b/wavefront/server/modules/tools_module/tools_module/tools_container.py @@ -6,6 +6,7 @@ from tools_module.datasources.provider import DatasourceToolDetailsProvider from tools_module.knowlegebase.provider import KnowledgeBaseToolDetailsProvider +from tools_module.message_processor.provider import MessageProcessorToolDetailsProvider from tools_module.services.default_tool_provider import DefaultToolDetailsProvider @@ -15,6 +16,9 @@ class ToolsContainer(containers.DeclarativeContainer): datasource_repository = providers.Dependency() knowledge_base_repository = providers.Dependency() knowledge_base_inference_repository = providers.Dependency() + message_processor_repository = providers.Dependency() + cloud_manager = providers.Dependency() + message_processor_bucket_name = providers.Dependency() # Tool loader tool_loader = providers.Singleton( ToolLoader, @@ -32,6 +36,13 @@ class ToolsContainer(containers.DeclarativeContainer): knowledge_base_inference_repository=knowledge_base_inference_repository, ) + message_processor_tool_provider = providers.Singleton( + MessageProcessorToolDetailsProvider, + message_processor_repository=message_processor_repository, + cloud_manager=cloud_manager, + message_processor_bucket_name=message_processor_bucket_name, + ) + default_tool_provider = providers.Singleton(DefaultToolDetailsProvider) # Tool service @@ -41,6 +52,7 @@ class ToolsContainer(containers.DeclarativeContainer): tool_providers=providers.List( datasource_tool_provider, knowledge_base_tool_provider, + message_processor_tool_provider, default_tool_provider, ), ) diff --git a/wavefront/server/modules/tools_module/tools_module/utils/message_processor_fn.py b/wavefront/server/modules/tools_module/tools_module/utils/message_processor_fn.py index 11189dcf..ce327eb3 100644 --- a/wavefront/server/modules/tools_module/tools_module/utils/message_processor_fn.py +++ b/wavefront/server/modules/tools_module/tools_module/utils/message_processor_fn.py @@ -1,4 +1,3 @@ -from typing import Dict, Any import json from plugins_module.controllers.message_processor_controller import ( execute_message_processor, @@ -6,19 +5,18 @@ ) -async def execute_message_processor_fn( - message_processor_id: str, - input_data: Dict[str, Any], -) -> str: - """Process a message using the message processor function +async def execute_message_processor_fn(message_processor_id: str, **kwargs) -> str: + """Execute a message processor function Args: - message_processor_id: The ID of the message processor to execute - input_data: The input data to pass to the message processor (dict of key-value pairs) + message_processor_id: UUID of the message processor to execute + **kwargs: Dynamic parameters based on processor's input_schema Returns: - The result from the message processor execution as a string + Result from message processor execution as string """ + # Remove message_processor_id from kwargs (it's not part of input_data) + input_data = {k: v for k, v in kwargs.items() if k != 'message_processor_id'} payload = ExecuteMessageProcessorPayload(input_data=input_data) response = await execute_message_processor(message_processor_id, payload) From b33c5f2d9f8a2719db142feb2e308da9cdb16656 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Wed, 24 Dec 2025 16:03:47 +0530 Subject: [PATCH 2/7] minor change --- .../server/background_jobs/workflow_job/workflow_job/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wavefront/server/background_jobs/workflow_job/workflow_job/main.py b/wavefront/server/background_jobs/workflow_job/workflow_job/main.py index 2bbf20b6..cfba1b66 100644 --- a/wavefront/server/background_jobs/workflow_job/workflow_job/main.py +++ b/wavefront/server/background_jobs/workflow_job/workflow_job/main.py @@ -51,7 +51,7 @@ knowledge_base_inference_repository=db_repo_container.knowledge_base_inference_repository, message_processor_repository=plugins_container.message_processor_repository, cloud_manager=common_container.cloud_storage_manager, - bucket_name=bucket_name, + message_processor_bucket_name=bucket_name, ) agents_container = AgentsContainer( From 5120991a5fcee4e8604f93b9ba480959a22f09fc Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Mon, 29 Dec 2025 18:31:58 +0530 Subject: [PATCH 3/7] added payload, desc to api_services/api_id - also api_services can now be added as tools --- .../server/apps/floware/floware/server.py | 20 +- .../workflow_job/workflow_job/main.py | 3 + .../agents_module/agents_container.py | 4 + .../services/agent_crud_service.py | 142 ++- .../services/agent_inference_service.py | 137 +++ .../api_services_module/config/parser.py | 48 + .../api_services_module/core/proxy.py | 36 + .../api_services_module/models/pipeline.py | 1 + .../api_services_module/models/service.py | 21 + .../api_services_module/pipeline/builder.py | 11 +- .../api_services_module/pipeline/stages.py | 181 +++- .../tests/test_payload_validation.py | 855 ++++++++++++++++++ .../tools_module/api_service/provider.py | 218 +++++ .../tools_module/available_tools.json | 12 + .../registry/function_registry.py | 4 + .../registries/api_service_registry.py | 16 + .../tools_module/tools_container.py | 8 + .../tools_module/utils/api_service_fn.py | 51 +- 18 files changed, 1743 insertions(+), 25 deletions(-) create mode 100644 wavefront/server/modules/api_services_module/tests/test_payload_validation.py create mode 100644 wavefront/server/modules/tools_module/tools_module/api_service/provider.py create mode 100644 wavefront/server/modules/tools_module/tools_module/registry/registries/api_service_registry.py diff --git a/wavefront/server/apps/floware/floware/server.py b/wavefront/server/apps/floware/floware/server.py index caa4f38c..01020d1c 100644 --- a/wavefront/server/apps/floware/floware/server.py +++ b/wavefront/server/apps/floware/floware/server.py @@ -143,6 +143,15 @@ product_analysis_container = ProductAnalysisContainer() +# API Services Container (must be created before tools_container) +api_services_container: ApiServicesContainer = create_api_services_container( + api_service_repository=db_repo_container.api_services_repository, + cloud_storage_manager=common_container.cloud_storage_manager, + db_client=db_repo_container.db_client, + cache_manager=db_repo_container.cache_manager, + response_formatter=common_container.response_formatter, +) + cloud_provider = os.getenv('CLOUD_PROVIDER', 'aws') bucket_name = ( os.getenv('AWS_GOLD_ASSET_BUCKET_NAME') @@ -155,6 +164,7 @@ knowledge_base_repository=db_repo_container.knowledge_base_repository, knowledge_base_inference_repository=db_repo_container.knowledge_base_inference_repository, message_processor_repository=plugins_container.message_processor_repository, + api_services_manager=api_services_container.api_service_manager, cloud_manager=common_container.cloud_storage_manager, message_processor_bucket_name=bucket_name, ) @@ -171,6 +181,7 @@ workflow_repository=db_repo_container.workflow_repository, message_processor_repository=plugins_container.message_processor_repository, message_processor_bucket_name=bucket_name, + api_services_manager=api_services_container.api_service_manager, ) inference_container = InferenceContainer( @@ -183,15 +194,6 @@ cache_manager=db_repo_container.cache_manager, ) -# API Services Container -api_services_container: ApiServicesContainer = create_api_services_container( - api_service_repository=db_repo_container.api_services_repository, - cloud_storage_manager=common_container.cloud_storage_manager, - db_client=db_repo_container.db_client, - cache_manager=db_repo_container.cache_manager, - response_formatter=common_container.response_formatter, -) - voice_agents_container = VoiceAgentsContainer( db_client=db_repo_container.db_client, cache_manager=db_repo_container.cache_manager, diff --git a/wavefront/server/background_jobs/workflow_job/workflow_job/main.py b/wavefront/server/background_jobs/workflow_job/workflow_job/main.py index cfba1b66..816dfc2b 100644 --- a/wavefront/server/background_jobs/workflow_job/workflow_job/main.py +++ b/wavefront/server/background_jobs/workflow_job/workflow_job/main.py @@ -50,6 +50,7 @@ knowledge_base_repository=db_repo_container.knowledge_base_repository, knowledge_base_inference_repository=db_repo_container.knowledge_base_inference_repository, message_processor_repository=plugins_container.message_processor_repository, + api_services_manager=api_services_container.api_service_manager, cloud_manager=common_container.cloud_storage_manager, message_processor_bucket_name=bucket_name, ) @@ -65,6 +66,8 @@ agent_repository=db_repo_container.agent_repository, workflow_repository=db_repo_container.workflow_repository, message_processor_repository=plugins_container.message_processor_repository, + message_processor_bucket_name=bucket_name, + api_services_manager=api_services_container.api_service_manager, ) common_container.wire( diff --git a/wavefront/server/modules/agents_module/agents_module/agents_container.py b/wavefront/server/modules/agents_module/agents_module/agents_container.py index 6e3269ce..cc5e77b9 100644 --- a/wavefront/server/modules/agents_module/agents_module/agents_container.py +++ b/wavefront/server/modules/agents_module/agents_module/agents_container.py @@ -32,6 +32,8 @@ class AgentsContainer(containers.DeclarativeContainer): message_processor_bucket_name = providers.Dependency() + api_services_manager = providers.Dependency() + namespace_service = providers.Singleton( NamespaceService, namespace_repository=namespace_repository, @@ -47,6 +49,7 @@ class AgentsContainer(containers.DeclarativeContainer): bucket_name=config.agents.agent_yaml_bucket, message_processor_repository=message_processor_repository, message_processor_bucket_name=message_processor_bucket_name, + api_services_manager=api_services_manager, ) # Agent inference service @@ -58,6 +61,7 @@ class AgentsContainer(containers.DeclarativeContainer): message_processor_repository=message_processor_repository, cloud_storage_manager=cloud_storage_manager, message_processor_bucket_name=message_processor_bucket_name, + api_services_manager=api_services_manager, ) workflow_crud_service = providers.Singleton( diff --git a/wavefront/server/modules/agents_module/agents_module/services/agent_crud_service.py b/wavefront/server/modules/agents_module/agents_module/services/agent_crud_service.py index 0521aa2a..3a717a31 100644 --- a/wavefront/server/modules/agents_module/agents_module/services/agent_crud_service.py +++ b/wavefront/server/modules/agents_module/agents_module/services/agent_crud_service.py @@ -1,5 +1,6 @@ import json import yaml +import re from typing import List, Optional from uuid import UUID @@ -20,6 +21,8 @@ from agents_module.utils.validation_utils import validate_agent_workflow_name from flo_ai import AgentBuilder from flo_ai.tool.base_tool import Tool +from api_services_module.core.manager import ApiServicesManager +from api_services_module.config.parser import ServiceDefinitionParser class AgentCrudService: @@ -34,6 +37,7 @@ def __init__( bucket_name: str, message_processor_repository: SQLAlchemyRepository[MessageProcessors], message_processor_bucket_name: str, + api_services_manager: Optional[ApiServicesManager] = None, ): """ Initialize the agent CRUD service @@ -54,6 +58,7 @@ def __init__( self.bucket_name = bucket_name self.message_processor_repository = message_processor_repository self.message_processor_bucket_name = message_processor_bucket_name + self.api_services_manager = api_services_manager self.cache_ttl = 3600 # 1 hour for agents async def _validate_yaml_content( @@ -102,10 +107,17 @@ async def _validate_yaml_content( tool_registry[tool_name] = tool_obj tool_found = True + # If still not found, try loading as API service + if not tool_found: + tool_obj = await self._try_load_api_service_tool(tool_name) + if tool_obj: + tool_registry[tool_name] = tool_obj + tool_found = True + # If still not found, log warning (AgentBuilder will fail with better error) if not tool_found: logger.warning( - f'Tool {tool_name} not found in available tools or message processors' + f'Tool {tool_name} not found in available tools, message processors, or API services' ) AgentBuilder.from_yaml( @@ -195,6 +207,134 @@ async def _try_load_message_processor_tool(self, tool_name: str) -> Optional[Too logger.debug(f'Message processor {tool_name} not found: {str(e)}') return None + async def _try_load_api_service_tool(self, tool_name: str) -> Optional[Tool]: + """ + Attempt to load an API service as a Tool object. + + Args: + tool_name: Name of the tool in format "service_id_api_id" + + Returns: + Tool object if API service found, None otherwise + """ + from tools_module.utils.api_service_fn import execute_api_service_fn + + try: + # Check if api_services_manager is available + if not self.api_services_manager: + return None + + # Parse tool name to extract service_id and api_id + if '_' not in tool_name: + return None + + parts = tool_name.split('_', 1) + service_id = parts[0] + api_id = parts[1] + + # Query API service by id + service = await self.api_services_manager.get_api_service(id=service_id) + + if not service or not service.is_active: + return None + + # Load YAML content + yaml_content = self.api_services_manager.fetch_service_def(service) + + # Parse YAML to ServiceDefinition + service_def = ServiceDefinitionParser.parse_yaml_string(yaml_content) + + # Find the specific API config + api_config = service_def.get_api_by_id(api_id) + if not api_config: + return None + + # Build parameters dict for Tool + parameters = { + 'api_service_id': { + 'type': 'string', + 'description': 'ID of the API service (automatically filled)', + }, + 'api_id': { + 'type': 'string', + 'description': 'ID of the API endpoint (automatically filled)', + }, + 'api_version': { + 'type': 'string', + 'description': 'API version (automatically filled)', + }, + } + + # Extract path parameters from path template + path_params = self._extract_path_params(api_config.path) + for param_name in path_params: + parameters[f'path_{param_name}'] = { + 'type': 'string', + 'description': f'Path parameter: {param_name}', + } + + # Add query parameters from backend_query_params + for param_name, default_value in api_config.backend_query_params.items(): + param_type = self._infer_type(default_value) + parameters[f'query_{param_name}'] = { + 'type': param_type, + 'description': f'Query parameter: {param_name}', + } + + # Add payload schema fields + if api_config.payload_schema: + for field in api_config.payload_schema.fields: + parameters[field.name] = { + 'type': field.type, + 'description': field.description + or f'Payload field: {field.name}', + } + + # Build description - use API's description if available + if api_config.description: + description = api_config.description + else: + # Fallback to default description + description = f'Execute {service_def.id} API: {api_config.id}. Method: {api_config.method.value}.' + if api_config.payload_schema and api_config.payload_schema.fields: + description += f' Accepts {len(api_config.payload_schema.fields)} parameter(s).' + + # Create Tool object + tool = Tool( + name=tool_name, + description=description, + function=execute_api_service_fn, + parameters=parameters, + ) + + logger.info(f'Dynamically loaded API service tool: {tool_name}') + return tool + + except Exception as e: + logger.debug(f'API service {tool_name} not found: {str(e)}') + return None + + def _extract_path_params(self, path: str) -> list: + """Extract parameter names from path template.""" + pattern = r'\{([^}]+)\}' + matches = re.findall(pattern, path) + return matches + + def _infer_type(self, value) -> str: + """Infer JSON schema type from Python value.""" + if isinstance(value, bool): + return 'boolean' + elif isinstance(value, int): + return 'integer' + elif isinstance(value, float): + return 'number' + elif isinstance(value, list): + return 'array' + elif isinstance(value, dict): + return 'object' + else: + return 'string' + async def create_agent( self, name: str, diff --git a/wavefront/server/modules/agents_module/agents_module/services/agent_inference_service.py b/wavefront/server/modules/agents_module/agents_module/services/agent_inference_service.py index 4043f062..19f3fc1c 100644 --- a/wavefront/server/modules/agents_module/agents_module/services/agent_inference_service.py +++ b/wavefront/server/modules/agents_module/agents_module/services/agent_inference_service.py @@ -14,7 +14,11 @@ from common_module.log.logger import logger from tools_module.registry.tool_loader import ToolLoader from tools_module.utils.message_processor_fn import execute_message_processor_fn +from tools_module.utils.api_service_fn import execute_api_service_fn +from api_services_module.core.manager import ApiServicesManager +from api_services_module.config.parser import ServiceDefinitionParser import yaml +import re class AgentInferenceService: @@ -28,6 +32,7 @@ def __init__( message_processor_repository: SQLAlchemyRepository[MessageProcessors], cloud_storage_manager: CloudStorageManager, message_processor_bucket_name: str, + api_services_manager: Optional[ApiServicesManager] = None, ): """ Initialize the agent inference service @@ -39,9 +44,11 @@ def __init__( message_processor_repository: Repository for message processors cloud_storage_manager: Cloud storage manager instance message_processor_bucket_name: Name of the bucket containing message processor YAML files + api_services_manager: API services manager instance (optional) """ self.cache_manager = cache_manager self.tool_loader = tool_loader + self.api_services_manager = api_services_manager self.agent_crud_service = agent_crud_service self.message_processor_repository = message_processor_repository self.cloud_storage_manager = cloud_storage_manager @@ -83,6 +90,10 @@ async def create_agent_from_yaml( if tools is None: tools = await self._try_load_message_processor_tool(tool_name) + # If still not found, try loading as API service + if tools is None: + tools = await self._try_load_api_service_tool(tool_name) + if tools: tool_register[tool_name] = tools else: @@ -182,6 +193,132 @@ async def _try_load_message_processor_tool(self, tool_name: str) -> Optional[Too logger.debug(f'Message processor {tool_name} not found: {str(e)}') return None + async def _try_load_api_service_tool(self, tool_name: str) -> Optional[Tool]: + """ + Attempt to load an API service as a Tool object. + + Args: + tool_name: Name of the tool in format "service_id_api_id" + + Returns: + Tool object if API service found, None otherwise + """ + try: + # Check if api_services_manager is available + if not self.api_services_manager: + return None + + # Parse tool name to extract service_id and api_id + if '_' not in tool_name: + return None + + parts = tool_name.split('_', 1) + service_id = parts[0] + api_id = parts[1] + + # Query API service by id + service = await self.api_services_manager.get_api_service(id=service_id) + + if not service or not service.is_active: + return None + + # Load YAML content + yaml_content = self.api_services_manager.fetch_service_def(service) + + # Parse YAML to ServiceDefinition + service_def = ServiceDefinitionParser.parse_yaml_string(yaml_content) + + # Find the specific API config + api_config = service_def.get_api_by_id(api_id) + if not api_config: + return None + + # Build parameters dict for Tool + parameters = { + 'api_service_id': { + 'type': 'string', + 'description': 'ID of the API service (automatically filled)', + }, + 'api_id': { + 'type': 'string', + 'description': 'ID of the API endpoint (automatically filled)', + }, + 'api_version': { + 'type': 'string', + 'description': 'API version (automatically filled)', + }, + } + + # Extract path parameters from path template + path_params = self._extract_path_params(api_config.path) + for param_name in path_params: + parameters[f'path_{param_name}'] = { + 'type': 'string', + 'description': f'Path parameter: {param_name}', + } + + # Add query parameters from backend_query_params + for param_name, default_value in api_config.backend_query_params.items(): + param_type = self._infer_type(default_value) + parameters[f'query_{param_name}'] = { + 'type': param_type, + 'description': f'Query parameter: {param_name}', + } + + # Add payload schema fields + if api_config.payload_schema: + for field in api_config.payload_schema.fields: + parameters[field.name] = { + 'type': field.type, + 'description': field.description + or f'Payload field: {field.name}', + } + + # Build description - use API's description if available + if api_config.description: + description = api_config.description + else: + # Fallback to default description + description = f'Execute {service_def.id} API: {api_config.id}. Method: {api_config.method.value}.' + if api_config.payload_schema and api_config.payload_schema.fields: + description += f' Accepts {len(api_config.payload_schema.fields)} parameter(s).' + + # Create Tool object + tool = Tool( + name=tool_name, + description=description, + function=execute_api_service_fn, + parameters=parameters, + ) + + logger.info(f'Dynamically loaded API service tool: {tool_name}') + return tool + + except Exception as e: + logger.debug(f'API service {tool_name} not found: {str(e)}') + return None + + def _extract_path_params(self, path: str) -> list: + """Extract parameter names from path template.""" + pattern = r'\{([^}]+)\}' + matches = re.findall(pattern, path) + return matches + + def _infer_type(self, value) -> str: + """Infer JSON schema type from Python value.""" + if isinstance(value, bool): + return 'boolean' + elif isinstance(value, int): + return 'integer' + elif isinstance(value, float): + return 'number' + elif isinstance(value, list): + return 'array' + elif isinstance(value, dict): + return 'object' + else: + return 'string' + def _create_llm_instance(self, config: LlmInferenceConfig): """ Create LLM instance based on configuration diff --git a/wavefront/server/modules/api_services_module/api_services_module/config/parser.py b/wavefront/server/modules/api_services_module/api_services_module/config/parser.py index a86f4660..84da0321 100644 --- a/wavefront/server/modules/api_services_module/api_services_module/config/parser.py +++ b/wavefront/server/modules/api_services_module/api_services_module/config/parser.py @@ -8,6 +8,8 @@ ApiConfig, AuthType, HttpMethod, + PayloadFieldSchema, + PayloadSchema, ) @@ -106,6 +108,44 @@ def _parse_auth_config(auth_data: Dict[str, Any]) -> AuthConfig: return auth_config + @staticmethod + def _parse_payload_schema(schema_data: Dict[str, Any]) -> PayloadSchema: + """Parse payload schema configuration.""" + if not schema_data or 'fields' not in schema_data: + return None + + fields = [] + for field_data in schema_data['fields']: + # Validate required fields + field_name = field_data.get('name') + field_type = field_data.get('type') + + if not field_name: + raise ValueError('Payload field missing required attribute: name') + if not field_type: + raise ValueError( + f"Payload field '{field_name}' missing required attribute: type" + ) + + # Validate field type + valid_types = ['string', 'integer', 'number', 'boolean', 'array', 'object'] + if field_type not in valid_types: + raise ValueError( + f"Invalid payload field type '{field_type}' for field '{field_name}'. " + f"Must be one of: {', '.join(valid_types)}" + ) + + # Create field schema + field_schema = PayloadFieldSchema( + name=field_name, + type=field_type, + required=field_data.get('required', False), + description=field_data.get('description', ''), + ) + fields.append(field_schema) + + return PayloadSchema(fields=fields) + @staticmethod def _parse_api_configs(apis_data: List[Dict[str, Any]]) -> List[ApiConfig]: """Parse API configurations.""" @@ -136,17 +176,25 @@ def _parse_api_configs(apis_data: List[Dict[str, Any]]) -> List[ApiConfig]: f'Invalid HTTP method: {method_str}. Must be one of: {[m.value for m in HttpMethod]}' ) + # Parse payload schema if present + payload_schema = ServiceDefinitionParser._parse_payload_schema( + api_data.get('payload_schema', {}) + ) + api_config = ApiConfig( id=api_id, path=path, backend_path=backend_path, method=method, version=api_data.get('version', 'v1'), + description=api_data.get('description', ''), additional_headers=api_data.get('additional_headers', {}), + backend_query_params=api_data.get('backend_query_params', {}), output_mapper_enabled=api_data.get('output_mapper', {}).get( 'enabled', False ), output_mapper=api_data.get('output_mapper', {}).get('mapper', {}), + payload_schema=payload_schema, ) api_configs.append(api_config) diff --git a/wavefront/server/modules/api_services_module/api_services_module/core/proxy.py b/wavefront/server/modules/api_services_module/api_services_module/core/proxy.py index d91b330f..1a34fb3c 100644 --- a/wavefront/server/modules/api_services_module/api_services_module/core/proxy.py +++ b/wavefront/server/modules/api_services_module/api_services_module/core/proxy.py @@ -159,6 +159,17 @@ async def process_request( except PipelineException as e: logger.error(f'PipelineException: {str(e)}', exc_info=True) context.add_trace('proxy', f'Pipeline error: {str(e)}') + + # Special handling for payload validation errors + if 'payload_validator' in e.stage_name: + return ProxyResponse.error( + message=e.message, + trace=context.execution_trace, + status='validation_error', + http_status_code=400, # Bad Request for validation errors + ) + + # Default pipeline error handling return ProxyResponse.error( message=f'Pipeline error: {e.message}', trace=context.execution_trace, @@ -250,6 +261,28 @@ def remove_service(self, service_id: str): self.pipeline_cache.invalidate_service(service_id) logger.info(f'Invalidated pipelines for service: {service_id}') + def _serialize_payload_schema(self, payload_schema) -> Dict[str, Any]: + """ + Serialize PayloadSchema object to dictionary. + + Args: + payload_schema: PayloadSchema object + + Returns: + Dictionary representation of the schema + """ + return { + 'fields': [ + { + 'name': field.name, + 'type': field.type, + 'required': field.required, + 'description': field.description, + } + for field in payload_schema.fields + ] + } + def get_service_info(self, service_id: str) -> Dict[str, Any]: """ Get information about a service. @@ -281,6 +314,9 @@ def get_service_info(self, service_id: str) -> Dict[str, Any]: 'backend_query_params': api.backend_query_params, 'output_mapper_enabled': api.output_mapper_enabled, 'output_mapper': api.output_mapper, + 'payload_schema': self._serialize_payload_schema(api.payload_schema) + if api.payload_schema + else None, } for api in service_definition.apis ], diff --git a/wavefront/server/modules/api_services_module/api_services_module/models/pipeline.py b/wavefront/server/modules/api_services_module/api_services_module/models/pipeline.py index 12cf7069..7bb27a24 100644 --- a/wavefront/server/modules/api_services_module/api_services_module/models/pipeline.py +++ b/wavefront/server/modules/api_services_module/api_services_module/models/pipeline.py @@ -14,6 +14,7 @@ class StageType(Enum): AUTHENTICATOR = 'authenticator' HEADER_INJECTOR = 'header_injector' API_PROCESSOR = 'api_processor' + PAYLOAD_VALIDATOR = 'payload_validator' REQUEST_SENDER = 'request_sender' RESPONSE_MAPPER = 'response_mapper' COMPOSITE = 'composite' diff --git a/wavefront/server/modules/api_services_module/api_services_module/models/service.py b/wavefront/server/modules/api_services_module/api_services_module/models/service.py index a0db442b..0703f95d 100644 --- a/wavefront/server/modules/api_services_module/api_services_module/models/service.py +++ b/wavefront/server/modules/api_services_module/api_services_module/models/service.py @@ -23,6 +23,23 @@ class HttpMethod(Enum): PATCH = 'PATCH' +@dataclass +class PayloadFieldSchema: + """Schema definition for a single payload field.""" + + name: str + type: str # string, integer, number, boolean, array, object + required: bool = False + description: str = '' + + +@dataclass +class PayloadSchema: + """Complete payload schema definition.""" + + fields: List['PayloadFieldSchema'] = field(default_factory=list) + + @dataclass class AuthConfig: """Authentication configuration.""" @@ -53,6 +70,7 @@ class ApiConfig: backend_path: str method: HttpMethod version: str = 'v1' + description: str = '' additional_headers: Dict[str, str] = field(default_factory=dict) # Backend query parameters to be sent with the request backend_query_params: Dict[str, Any] = field(default_factory=dict) @@ -61,6 +79,9 @@ class ApiConfig: output_mapper_enabled: bool = False output_mapper: Dict[str, str] = field(default_factory=dict) + # Payload validation schema (for POST/PUT/PATCH requests) + payload_schema: Optional['PayloadSchema'] = None + @dataclass class ServiceDefinition: diff --git a/wavefront/server/modules/api_services_module/api_services_module/pipeline/builder.py b/wavefront/server/modules/api_services_module/api_services_module/pipeline/builder.py index 334ffe1d..4b14ed0a 100644 --- a/wavefront/server/modules/api_services_module/api_services_module/pipeline/builder.py +++ b/wavefront/server/modules/api_services_module/api_services_module/pipeline/builder.py @@ -8,6 +8,7 @@ RequestHeadersForwarderStage, HeaderInjectorStage, ApiProcessorStage, + PayloadValidatorStage, RequestSenderStage, ResponseMapperStage, ) @@ -49,7 +50,7 @@ def build_api_pipeline( """ Build API processing pipeline for a specific API. - Pipeline: [API Processor → Header Injector → Request Sender → Response Mapper] + Pipeline: [API Processor → Header Injector → Payload Validator → Request Sender → Response Mapper] Note: Skipping preprocessor and postprocessor as requested """ stages: List[PipelineStage] = [] @@ -65,11 +66,15 @@ def build_api_pipeline( ) stages.append(api_header_injector) - # 3. Request sender stage + # 3. Payload validator stage (validates request body before sending) + payload_validator = PayloadValidatorStage(api_config) + stages.append(payload_validator) + + # 4. Request sender stage request_sender = RequestSenderStage() stages.append(request_sender) - # 4. Response mapper stage + # 5. Response mapper stage response_mapper = ResponseMapperStage(api_config) stages.append(response_mapper) diff --git a/wavefront/server/modules/api_services_module/api_services_module/pipeline/stages.py b/wavefront/server/modules/api_services_module/api_services_module/pipeline/stages.py index 5a7aa9d6..610d7f66 100644 --- a/wavefront/server/modules/api_services_module/api_services_module/pipeline/stages.py +++ b/wavefront/server/modules/api_services_module/api_services_module/pipeline/stages.py @@ -2,7 +2,7 @@ import asyncio import httpx -from typing import Dict, Any +from typing import Dict, Any, Optional from urllib.parse import urljoin from ..models.pipeline import ( @@ -445,3 +445,182 @@ def get_stage_type(self) -> StageType: def get_name(self) -> str: """Return stage name.""" return f'response_mapper_{self.api_config.id}' + + +class PayloadValidatorStage(PipelineStage): + """Pipeline stage for validating request payload against schema.""" + + def __init__(self, api_config: ApiConfig): + self.api_config = api_config + + async def execute(self, context: PipelineContext) -> PipelineContext: + """Validate request payload against schema.""" + context.add_trace(self.get_name(), 'Starting payload validation') + + try: + # Skip validation if no schema defined + if not self.api_config.payload_schema: + context.add_trace( + self.get_name(), 'No payload schema defined, skipping validation' + ) + return context + + # Skip validation if method is not POST/PUT/PATCH + method = context.method.upper() + if method not in ['POST', 'PUT', 'PATCH']: + context.add_trace( + self.get_name(), + f'Method {method} does not require payload validation, skipping', + ) + return context + + # Skip validation if no body provided + if context.body is None: + context.add_trace(self.get_name(), 'No request body provided') + # Check if there are required fields + required_fields = [ + f.name for f in self.api_config.payload_schema.fields if f.required + ] + if required_fields: + error_msg = f"Missing required fields: {', '.join(required_fields)}" + context.add_trace( + self.get_name(), f'Validation failed: {error_msg}' + ) + raise PipelineException( + f'Payload validation failed: {error_msg}', + self.get_name(), + context, + ) + return context + + # Validate payload + validation_errors = self._validate_payload( + context.body, self.api_config.payload_schema + ) + + if validation_errors: + error_msg = '; '.join(validation_errors) + context.add_trace(self.get_name(), f'Validation failed: {error_msg}') + raise PipelineException( + f'Payload validation failed: {error_msg}', self.get_name(), context + ) + + context.add_trace( + self.get_name(), + f'Payload validation completed successfully ({len(self.api_config.payload_schema.fields)} fields checked)', + ) + return context + + except PipelineException: + # Re-raise pipeline exceptions + raise + except Exception as e: + context.add_trace(self.get_name(), f'Payload validation error: {str(e)}') + raise PipelineException( + f'Payload validation error: {str(e)}', self.get_name(), context + ) + + def _validate_payload(self, payload: Any, schema) -> list: + """ + Validate payload against schema. + + Args: + payload: Request payload (should be dict) + schema: PayloadSchema object + + Returns: + List of validation error messages (empty if valid) + """ + errors = [] + + # Payload must be a dictionary + if not isinstance(payload, dict): + errors.append( + f'Payload must be an object/dictionary, got {type(payload).__name__}' + ) + return errors + + # Check each field in schema + for field_schema in schema.fields: + field_name = field_schema.name + field_type = field_schema.type + required = field_schema.required + + # Check if field exists + if field_name not in payload or payload[field_name] is None: + if required: + errors.append(f"Missing required field '{field_name}'") + continue + + # Validate field type + value = payload[field_name] + type_error = self._validate_field_type(value, field_type, field_name) + if type_error: + errors.append(type_error) + + return errors + + def _validate_field_type( + self, value: Any, expected_type: str, field_name: str + ) -> Optional[str]: + """ + Validate individual field type. + + Args: + value: Field value to validate + expected_type: Expected type (string, integer, number, boolean, array, object) + field_name: Name of the field (for error messages) + + Returns: + Error message if invalid, None if valid + """ + actual_type = type(value).__name__ + + if expected_type == 'string': + if not isinstance(value, str): + return ( + f"Field '{field_name}' expected type string but got {actual_type}" + ) + + elif expected_type == 'integer': + # Must be int, not float (even if float is a whole number) + if not isinstance(value, int) or isinstance(value, bool): + return ( + f"Field '{field_name}' expected type integer but got {actual_type}" + ) + + elif expected_type == 'number': + # Can be int or float + if not isinstance(value, (int, float)) or isinstance(value, bool): + return ( + f"Field '{field_name}' expected type number but got {actual_type}" + ) + + elif expected_type == 'boolean': + if not isinstance(value, bool): + return ( + f"Field '{field_name}' expected type boolean but got {actual_type}" + ) + + elif expected_type == 'array': + if not isinstance(value, list): + return f"Field '{field_name}' expected type array but got {actual_type}" + + elif expected_type == 'object': + if not isinstance(value, dict): + return ( + f"Field '{field_name}' expected type object but got {actual_type}" + ) + + else: + return f"Field '{field_name}' has unknown type '{expected_type}'" + + return None + + def get_stage_type(self) -> StageType: + """Return payload validator stage type.""" + return StageType.PAYLOAD_VALIDATOR + + def get_name(self) -> str: + """Return stage name.""" + return f'payload_validator_{self.api_config.id}' diff --git a/wavefront/server/modules/api_services_module/tests/test_payload_validation.py b/wavefront/server/modules/api_services_module/tests/test_payload_validation.py new file mode 100644 index 00000000..3407daad --- /dev/null +++ b/wavefront/server/modules/api_services_module/tests/test_payload_validation.py @@ -0,0 +1,855 @@ +""" +Tests for payload validation functionality. + +Tests cover: +- Valid payload passes validation +- Missing required fields return 400 +- Wrong field types return 400 +- Extra fields are allowed +- Validation applies to POST/PUT/PATCH only +- Validation skipped when no schema defined +- Complex types (object, array) +- Detailed error messages +- Multiple validation errors +""" + +import pytest +from api_services_module.config.parser import ServiceDefinitionParser +from api_services_module.core.proxy import ApiProxy +from api_services_module.config.registry import ServiceRegistry +from api_services_module.models.service import ( + PayloadFieldSchema, + PayloadSchema, + ApiConfig, + HttpMethod, +) +from api_services_module.pipeline.stages import PayloadValidatorStage +from api_services_module.models.pipeline import PipelineContext, PipelineException + + +# ============================================================================ +# Fixtures +# ============================================================================ + + +@pytest.fixture +def payload_validation_yaml(): + """YAML configuration with payload validation.""" + return """ +service: + id: test-validation-service + base_url: https://api.test.com + auth: + id: test-auth + type: bearer + token: test-token + apis: + - id: create-user + path: /users + backend_path: /users + method: POST + payload_schema: + fields: + - name: name + type: string + required: true + description: User's full name + - name: email + type: string + required: true + description: User's email address + - name: age + type: integer + required: false + description: User's age + - name: is_active + type: boolean + required: false + description: Whether user is active + - id: update-user + path: /users/{id} + backend_path: /users/{id} + method: PUT + payload_schema: + fields: + - name: name + type: string + required: false + - name: email + type: string + required: false + - id: no-validation-api + path: /no-validation + backend_path: /no-validation + method: POST +""" + + +@pytest.fixture +def complex_types_yaml(): + """YAML configuration with complex types (array, object).""" + return """ +service: + id: complex-types-service + base_url: https://api.complex.com + auth: + id: complex-auth + type: bearer + token: complex-token + apis: + - id: create-order + path: /orders + backend_path: /orders + method: POST + payload_schema: + fields: + - name: customer + type: object + required: true + description: Customer object + - name: items + type: array + required: true + description: Order items + - name: total + type: number + required: true + description: Total amount +""" + + +@pytest.fixture +async def validation_service_registry(payload_validation_yaml): + """Service registry with validation-enabled service.""" + from tests.conftest import MockApiServicesManager + + yaml_map = {'test-validation-service': payload_validation_yaml} + manager = MockApiServicesManager(service_yaml_map=yaml_map) + registry = ServiceRegistry(manager) + await registry.load_from_db() + return registry + + +@pytest.fixture +async def complex_types_registry(complex_types_yaml): + """Service registry with complex types validation.""" + from tests.conftest import MockApiServicesManager + + yaml_map = {'complex-types-service': complex_types_yaml} + manager = MockApiServicesManager(service_yaml_map=yaml_map) + registry = ServiceRegistry(manager) + await registry.load_from_db() + return registry + + +# ============================================================================ +# Parser Tests +# ============================================================================ + + +def test_parse_payload_schema_valid(): + """Test parsing valid payload schema from YAML.""" + yaml_content = """ +service: + id: test-service + base_url: https://api.test.com + auth: + id: test-auth + type: bearer + token: test-token + apis: + - id: test-api + path: /test + backend_path: /test + method: POST + payload_schema: + fields: + - name: field1 + type: string + required: true + description: Test field + - name: field2 + type: integer + required: false +""" + + service_def = ServiceDefinitionParser.parse_yaml_string(yaml_content) + api_config = service_def.apis[0] + + assert api_config.payload_schema is not None + assert len(api_config.payload_schema.fields) == 2 + + field1 = api_config.payload_schema.fields[0] + assert field1.name == 'field1' + assert field1.type == 'string' + assert field1.required is True + assert field1.description == 'Test field' + + field2 = api_config.payload_schema.fields[1] + assert field2.name == 'field2' + assert field2.type == 'integer' + assert field2.required is False + + +def test_parse_payload_schema_no_schema(): + """Test parsing API without payload schema.""" + yaml_content = """ +service: + id: test-service + base_url: https://api.test.com + auth: + id: test-auth + type: bearer + token: test-token + apis: + - id: test-api + path: /test + backend_path: /test + method: GET +""" + + service_def = ServiceDefinitionParser.parse_yaml_string(yaml_content) + api_config = service_def.apis[0] + + assert api_config.payload_schema is None + + +def test_parse_payload_schema_invalid_type(): + """Test parsing payload schema with invalid type.""" + yaml_content = """ +service: + id: test-service + base_url: https://api.test.com + auth: + id: test-auth + type: bearer + token: test-token + apis: + - id: test-api + path: /test + backend_path: /test + method: POST + payload_schema: + fields: + - name: field1 + type: invalid_type + required: true +""" + + with pytest.raises(ValueError) as exc_info: + ServiceDefinitionParser.parse_yaml_string(yaml_content) + + assert "Invalid payload field type 'invalid_type'" in str(exc_info.value) + + +def test_parse_payload_schema_missing_name(): + """Test parsing payload schema with missing field name.""" + yaml_content = """ +service: + id: test-service + base_url: https://api.test.com + auth: + id: test-auth + type: bearer + token: test-token + apis: + - id: test-api + path: /test + backend_path: /test + method: POST + payload_schema: + fields: + - type: string + required: true +""" + + with pytest.raises(ValueError) as exc_info: + ServiceDefinitionParser.parse_yaml_string(yaml_content) + + assert 'missing required attribute: name' in str(exc_info.value) + + +# ============================================================================ +# Validation Stage Tests +# ============================================================================ + + +@pytest.mark.asyncio +async def test_valid_payload_passes(): + """Test that valid payload passes validation.""" + schema = PayloadSchema( + fields=[ + PayloadFieldSchema(name='name', type='string', required=True), + PayloadFieldSchema(name='age', type='integer', required=False), + ] + ) + + api_config = ApiConfig( + id='test-api', + path='/test', + backend_path='/test', + method=HttpMethod.POST, + payload_schema=schema, + ) + + stage = PayloadValidatorStage(api_config) + context = PipelineContext(method='POST', body={'name': 'John Doe', 'age': 30}) + + result = await stage.execute(context) + assert result is context # No exception raised + + +@pytest.mark.asyncio +async def test_missing_required_field_fails(): + """Test that missing required field raises validation error.""" + schema = PayloadSchema( + fields=[ + PayloadFieldSchema(name='name', type='string', required=True), + PayloadFieldSchema(name='email', type='string', required=True), + ] + ) + + api_config = ApiConfig( + id='test-api', + path='/test', + backend_path='/test', + method=HttpMethod.POST, + payload_schema=schema, + ) + + stage = PayloadValidatorStage(api_config) + context = PipelineContext( + method='POST', + body={'name': 'John Doe'}, # Missing 'email' + ) + + with pytest.raises(PipelineException) as exc_info: + await stage.execute(context) + + assert 'validation failed' in str(exc_info.value).lower() + assert "Missing required field 'email'" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_wrong_field_type_fails(): + """Test that wrong field type raises validation error.""" + schema = PayloadSchema( + fields=[ + PayloadFieldSchema(name='age', type='integer', required=True), + ] + ) + + api_config = ApiConfig( + id='test-api', + path='/test', + backend_path='/test', + method=HttpMethod.POST, + payload_schema=schema, + ) + + stage = PayloadValidatorStage(api_config) + context = PipelineContext( + method='POST', + body={'age': 'thirty'}, # String instead of integer + ) + + with pytest.raises(PipelineException) as exc_info: + await stage.execute(context) + + assert 'expected type integer but got str' in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_extra_fields_allowed(): + """Test that extra fields not in schema are allowed.""" + schema = PayloadSchema( + fields=[ + PayloadFieldSchema(name='name', type='string', required=True), + ] + ) + + api_config = ApiConfig( + id='test-api', + path='/test', + backend_path='/test', + method=HttpMethod.POST, + payload_schema=schema, + ) + + stage = PayloadValidatorStage(api_config) + context = PipelineContext( + method='POST', body={'name': 'John Doe', 'extra_field': 'value', 'another': 123} + ) + + result = await stage.execute(context) + assert result is context # No exception raised + + +@pytest.mark.asyncio +async def test_validation_skipped_for_get_method(): + """Test that validation is skipped for GET requests.""" + schema = PayloadSchema( + fields=[ + PayloadFieldSchema(name='name', type='string', required=True), + ] + ) + + api_config = ApiConfig( + id='test-api', + path='/test', + backend_path='/test', + method=HttpMethod.GET, + payload_schema=schema, + ) + + stage = PayloadValidatorStage(api_config) + context = PipelineContext(method='GET', body={'invalid': 'data'}) + + result = await stage.execute(context) + assert result is context # Validation skipped + + +@pytest.mark.asyncio +async def test_validation_applies_to_post(): + """Test that validation applies to POST requests.""" + schema = PayloadSchema( + fields=[ + PayloadFieldSchema(name='name', type='string', required=True), + ] + ) + + api_config = ApiConfig( + id='test-api', + path='/test', + backend_path='/test', + method=HttpMethod.POST, + payload_schema=schema, + ) + + stage = PayloadValidatorStage(api_config) + context = PipelineContext( + method='POST', + body={}, # Missing required field + ) + + with pytest.raises(PipelineException): + await stage.execute(context) + + +@pytest.mark.asyncio +async def test_validation_applies_to_put(): + """Test that validation applies to PUT requests.""" + schema = PayloadSchema( + fields=[ + PayloadFieldSchema(name='name', type='string', required=True), + ] + ) + + api_config = ApiConfig( + id='test-api', + path='/test', + backend_path='/test', + method=HttpMethod.PUT, + payload_schema=schema, + ) + + stage = PayloadValidatorStage(api_config) + context = PipelineContext( + method='PUT', + body={}, # Missing required field + ) + + with pytest.raises(PipelineException): + await stage.execute(context) + + +@pytest.mark.asyncio +async def test_validation_applies_to_patch(): + """Test that validation applies to PATCH requests.""" + schema = PayloadSchema( + fields=[ + PayloadFieldSchema(name='name', type='string', required=True), + ] + ) + + api_config = ApiConfig( + id='test-api', + path='/test', + backend_path='/test', + method=HttpMethod.PATCH, + payload_schema=schema, + ) + + stage = PayloadValidatorStage(api_config) + context = PipelineContext( + method='PATCH', + body={}, # Missing required field + ) + + with pytest.raises(PipelineException): + await stage.execute(context) + + +@pytest.mark.asyncio +async def test_validation_skipped_when_no_schema(): + """Test that validation is skipped when no schema is defined.""" + api_config = ApiConfig( + id='test-api', + path='/test', + backend_path='/test', + method=HttpMethod.POST, + payload_schema=None, + ) + + stage = PayloadValidatorStage(api_config) + context = PipelineContext(method='POST', body={'any': 'data'}) + + result = await stage.execute(context) + assert result is context # No validation performed + + +@pytest.mark.asyncio +async def test_complex_type_object(): + """Test validation of object type.""" + schema = PayloadSchema( + fields=[ + PayloadFieldSchema(name='user', type='object', required=True), + ] + ) + + api_config = ApiConfig( + id='test-api', + path='/test', + backend_path='/test', + method=HttpMethod.POST, + payload_schema=schema, + ) + + stage = PayloadValidatorStage(api_config) + + # Valid object + context = PipelineContext(method='POST', body={'user': {'name': 'John', 'age': 30}}) + result = await stage.execute(context) + assert result is context + + # Invalid (not an object) + context_invalid = PipelineContext(method='POST', body={'user': 'not an object'}) + with pytest.raises(PipelineException) as exc_info: + await stage.execute(context_invalid) + assert 'expected type object but got str' in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_complex_type_array(): + """Test validation of array type.""" + schema = PayloadSchema( + fields=[ + PayloadFieldSchema(name='items', type='array', required=True), + ] + ) + + api_config = ApiConfig( + id='test-api', + path='/test', + backend_path='/test', + method=HttpMethod.POST, + payload_schema=schema, + ) + + stage = PayloadValidatorStage(api_config) + + # Valid array + context = PipelineContext(method='POST', body={'items': [1, 2, 3]}) + result = await stage.execute(context) + assert result is context + + # Invalid (not an array) + context_invalid = PipelineContext(method='POST', body={'items': 'not an array'}) + with pytest.raises(PipelineException) as exc_info: + await stage.execute(context_invalid) + assert 'expected type array but got str' in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_multiple_validation_errors(): + """Test that multiple validation errors are reported together.""" + schema = PayloadSchema( + fields=[ + PayloadFieldSchema(name='name', type='string', required=True), + PayloadFieldSchema(name='age', type='integer', required=True), + PayloadFieldSchema(name='active', type='boolean', required=False), + ] + ) + + api_config = ApiConfig( + id='test-api', + path='/test', + backend_path='/test', + method=HttpMethod.POST, + payload_schema=schema, + ) + + stage = PayloadValidatorStage(api_config) + context = PipelineContext( + method='POST', + body={'age': 'thirty', 'active': 'yes'}, # Missing 'name', wrong types + ) + + with pytest.raises(PipelineException) as exc_info: + await stage.execute(context) + + error_message = str(exc_info.value) + assert "Missing required field 'name'" in error_message + assert 'expected type integer but got str' in error_message + assert 'expected type boolean but got str' in error_message + + +@pytest.mark.asyncio +async def test_all_basic_types(): + """Test validation of all basic types.""" + schema = PayloadSchema( + fields=[ + PayloadFieldSchema(name='str_field', type='string', required=True), + PayloadFieldSchema(name='int_field', type='integer', required=True), + PayloadFieldSchema(name='num_field', type='number', required=True), + PayloadFieldSchema(name='bool_field', type='boolean', required=True), + ] + ) + + api_config = ApiConfig( + id='test-api', + path='/test', + backend_path='/test', + method=HttpMethod.POST, + payload_schema=schema, + ) + + stage = PayloadValidatorStage(api_config) + + # All valid types + context = PipelineContext( + method='POST', + body={ + 'str_field': 'hello', + 'int_field': 42, + 'num_field': 3.14, + 'bool_field': True, + }, + ) + result = await stage.execute(context) + assert result is context + + +@pytest.mark.asyncio +async def test_integer_vs_number_type(): + """Test distinction between integer and number types.""" + # Integer type should NOT accept floats + int_schema = PayloadSchema( + fields=[ + PayloadFieldSchema(name='value', type='integer', required=True), + ] + ) + + int_config = ApiConfig( + id='test-api', + path='/test', + backend_path='/test', + method=HttpMethod.POST, + payload_schema=int_schema, + ) + + int_stage = PayloadValidatorStage(int_config) + + # Valid integer + context_int = PipelineContext(method='POST', body={'value': 42}) + await int_stage.execute(context_int) + + # Invalid (float for integer) + context_float = PipelineContext(method='POST', body={'value': 3.14}) + with pytest.raises(PipelineException): + await int_stage.execute(context_float) + + # Number type should accept both int and float + num_schema = PayloadSchema( + fields=[ + PayloadFieldSchema(name='value', type='number', required=True), + ] + ) + + num_config = ApiConfig( + id='test-api', + path='/test', + backend_path='/test', + method=HttpMethod.POST, + payload_schema=num_schema, + ) + + num_stage = PayloadValidatorStage(num_config) + + # Valid integer + context_int = PipelineContext(method='POST', body={'value': 42}) + await num_stage.execute(context_int) + + # Valid float + context_float = PipelineContext(method='POST', body={'value': 3.14}) + await num_stage.execute(context_float) + + +@pytest.mark.asyncio +async def test_null_value_treated_as_missing(): + """Test that null values are treated as missing fields.""" + schema = PayloadSchema( + fields=[ + PayloadFieldSchema(name='required_field', type='string', required=True), + PayloadFieldSchema(name='optional_field', type='string', required=False), + ] + ) + + api_config = ApiConfig( + id='test-api', + path='/test', + backend_path='/test', + method=HttpMethod.POST, + payload_schema=schema, + ) + + stage = PayloadValidatorStage(api_config) + + # Null for required field should fail + context = PipelineContext(method='POST', body={'required_field': None}) + + with pytest.raises(PipelineException) as exc_info: + await stage.execute(context) + assert "Missing required field 'required_field'" in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_no_body_with_required_fields(): + """Test that no body with required fields raises error.""" + schema = PayloadSchema( + fields=[ + PayloadFieldSchema(name='name', type='string', required=True), + ] + ) + + api_config = ApiConfig( + id='test-api', + path='/test', + backend_path='/test', + method=HttpMethod.POST, + payload_schema=schema, + ) + + stage = PayloadValidatorStage(api_config) + context = PipelineContext(method='POST', body=None) + + with pytest.raises(PipelineException) as exc_info: + await stage.execute(context) + assert 'Missing required field' in str(exc_info.value) + + +@pytest.mark.asyncio +async def test_empty_string_valid_for_string_type(): + """Test that empty string is valid for string type.""" + schema = PayloadSchema( + fields=[ + PayloadFieldSchema(name='text', type='string', required=True), + ] + ) + + api_config = ApiConfig( + id='test-api', + path='/test', + backend_path='/test', + method=HttpMethod.POST, + payload_schema=schema, + ) + + stage = PayloadValidatorStage(api_config) + context = PipelineContext(method='POST', body={'text': ''}) + + result = await stage.execute(context) + assert result is context # Empty string is valid + + +# ============================================================================ +# Integration Tests +# ============================================================================ + + +@pytest.mark.asyncio +async def test_proxy_returns_400_for_validation_error(validation_service_registry): + """Test that proxy returns HTTP 400 for validation errors.""" + + proxy = ApiProxy(validation_service_registry) + + # Missing required fields + response = await proxy.process_request( + service_id='test-validation-service', + api_id='create-user', + api_version='v1', + method='POST', + path='/users', + body={'name': 'John'}, # Missing 'email' + ) + + assert response.http_status_code == 400 + assert response.meta['status'] == 'validation_error' + assert "Missing required field 'email'" in response.meta['message'] + + +@pytest.mark.asyncio +async def test_proxy_successful_with_valid_payload(validation_service_registry): + """Test that proxy succeeds with valid payload.""" + from unittest.mock import AsyncMock, Mock, patch + + proxy = ApiProxy(validation_service_registry) + + # Mock the httpx client + mock_response = Mock() + mock_response.status_code = 200 + mock_response.headers = {'content-type': 'application/json'} + mock_response.json.return_value = { + 'id': 1, + 'name': 'John', + 'email': 'john@test.com', + } + + with patch('httpx.AsyncClient') as mock_client_class: + mock_client = AsyncMock() + mock_client.request = AsyncMock(return_value=mock_response) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=None) + mock_client_class.return_value = mock_client + + response = await proxy.process_request( + service_id='test-validation-service', + api_id='create-user', + api_version='v1', + method='POST', + path='/users', + body={'name': 'John', 'email': 'john@test.com', 'age': 30}, + ) + + assert response.http_status_code == 200 + assert response.meta['status'] == 'success' + + +@pytest.mark.asyncio +async def test_validation_in_full_pipeline(validation_service_registry): + """Test that validation is properly integrated in the pipeline.""" + + proxy = ApiProxy(validation_service_registry) + + # Test with type error + response = await proxy.process_request( + service_id='test-validation-service', + api_id='create-user', + api_version='v1', + method='POST', + path='/users', + body={'name': 'John', 'email': 'john@test.com', 'age': 'thirty'}, # Wrong type + ) + + assert response.http_status_code == 400 + assert 'expected type integer but got str' in response.meta['message'] diff --git a/wavefront/server/modules/tools_module/tools_module/api_service/provider.py b/wavefront/server/modules/tools_module/tools_module/api_service/provider.py new file mode 100644 index 00000000..f7e8d75f --- /dev/null +++ b/wavefront/server/modules/tools_module/tools_module/api_service/provider.py @@ -0,0 +1,218 @@ +from typing import List, Dict, Any +import re +from tools_module.interfaces.tool_details_provider import ToolDetailsProvider +from tools_module.models.tool_schemas import ToolExecutionDetails +from api_services_module.core.manager import ApiServicesManager +from common_module.log.logger import logger +from api_services_module.config.parser import ServiceDefinitionParser + + +class ApiServiceToolDetailsProvider(ToolDetailsProvider): + """Provider for expanding API service tools from database + YAML definitions""" + + def __init__(self, api_services_manager: ApiServicesManager): + self.api_services_manager = api_services_manager + + def can_handle(self, category: str) -> bool: + return category == 'api_service' + + async def get_tool_details( + self, tool_metadata: Dict[str, Any] + ) -> List[ToolExecutionDetails]: + """ + Expand the trigger_api_service template into individual tools. + + For each API in each service: + 1. Fetch all API services from database + 2. Load YAML definitions from cloud storage + 3. Parse each API's payload_schema, path params, and query params + 4. Create a ToolExecutionDetails with API-specific parameters + """ + tool_details = [] + + # Fetch all API services from database + all_services = await self.api_services_manager.get_all_api_services() + + if not all_services: + logger.info('No API services found in database') + return tool_details + + for service in all_services: + try: + # Skip inactive services + if not service.is_active: + logger.debug(f'Skipping inactive service: {service.id}') + continue + + # Load YAML content from cloud storage + yaml_content = self.api_services_manager.fetch_service_def(service) + + # Parse YAML to ServiceDefinition + service_def = ServiceDefinitionParser.parse_yaml_string(yaml_content) + + # Create a tool for each API in the service + for api_config in service_def.apis: + try: + tool_name = self._generate_tool_name( + service_def.id, api_config.id + ) + parameters = self._build_parameters(api_config) + required = self._extract_required_params(api_config) + description = self._build_description(service_def, api_config) + + tool_details.append( + ToolExecutionDetails( + name=tool_name, + resource_name=f'{service_def.id}/{api_config.id}', + prefill_parameter_names=[ + 'api_service_id', + 'api_id', + 'api_version', + ], + prefilled_value={ + 'api_service_id': service_def.id, + 'api_id': api_config.id, + 'api_version': api_config.version, + }, + required=required, + parameters=parameters, + description=description, + category='api_service', + ) + ) + logger.debug( + f'Created tool: {tool_name} for service {service_def.id}' + ) + except Exception as e: + logger.warning( + f'Error creating tool for API {api_config.id} in service {service_def.id}: {str(e)}, skipping' + ) + continue + + except Exception as e: + logger.warning( + f'Error loading service {service.id}: {str(e)}, skipping' + ) + continue + + logger.info(f'Generated {len(tool_details)} API service tools') + return tool_details + + def _generate_tool_name(self, service_id: str, api_id: str) -> str: + """Generate tool name from service_id and api_id.""" + # Use _ separator between service and API + # Example: "my-api-service" + "get-user" → "my-api-service_get-user" + return f'{service_id}_{api_id}' + + def _build_parameters(self, api_config) -> Dict[str, Any]: + """ + Build tool parameters from API configuration. + + Parameters include: + 1. Payload schema fields (from payload_schema) + 2. Path parameters (extracted from path template) + 3. Query parameters (from backend_query_params) + """ + parameters = {} + + # Add prefilled parameters (not shown to user but needed for execution) + parameters['api_service_id'] = { + 'type': 'string', + 'description': 'ID of the API service (automatically filled)', + } + parameters['api_id'] = { + 'type': 'string', + 'description': 'ID of the API endpoint (automatically filled)', + } + parameters['api_version'] = { + 'type': 'string', + 'description': 'API version (automatically filled)', + } + + # Extract path parameters from path template + path_params = self._extract_path_params(api_config.path) + for param_name in path_params: + parameters[f'path_{param_name}'] = { + 'type': 'string', + 'description': f'Path parameter: {param_name}', + } + + # Add query parameters from backend_query_params + for param_name, default_value in api_config.backend_query_params.items(): + param_type = self._infer_type(default_value) + parameters[f'query_{param_name}'] = { + 'type': param_type, + 'description': f'Query parameter: {param_name}', + } + + # Add payload schema fields + if api_config.payload_schema: + for field in api_config.payload_schema.fields: + parameters[field.name] = { + 'type': field.type, + 'description': field.description or f'Payload field: {field.name}', + } + + return parameters + + def _extract_required_params(self, api_config) -> List[str]: + """Extract required parameter names.""" + required = [] + + # Path parameters are always required + path_params = self._extract_path_params(api_config.path) + required.extend([f'path_{param}' for param in path_params]) + + # Required payload fields + if api_config.payload_schema: + for field in api_config.payload_schema.fields: + if field.required: + required.append(field.name) + + return required + + def _extract_path_params(self, path: str) -> List[str]: + """ + Extract parameter names from path template. + + Example: "/users/{user_id}/posts/{post_id}" → ["user_id", "post_id"] + """ + # Use regex to find all {param_name} patterns + pattern = r'\{([^}]+)\}' + matches = re.findall(pattern, path) + return matches + + def _infer_type(self, value: Any) -> str: + """Infer JSON schema type from Python value.""" + if isinstance(value, bool): + return 'boolean' + elif isinstance(value, int): + return 'integer' + elif isinstance(value, float): + return 'number' + elif isinstance(value, list): + return 'array' + elif isinstance(value, dict): + return 'object' + else: + return 'string' + + def _build_description(self, service_def, api_config) -> str: + """Build tool description from service and API metadata.""" + # Use the API's description if available, otherwise build a default description + if api_config.description: + return api_config.description + + # Fallback to default description + desc_parts = [ + f'Execute {service_def.id} API: {api_config.id}.', + f'Method: {api_config.method.value}.', + ] + + # Add API-specific description if available from payload schema + if api_config.payload_schema and api_config.payload_schema.fields: + desc_parts.append( + f'Accepts {len(api_config.payload_schema.fields)} parameter(s).' + ) + + return ' '.join(desc_parts) diff --git a/wavefront/server/modules/tools_module/tools_module/available_tools.json b/wavefront/server/modules/tools_module/tools_module/available_tools.json index 0fe8e069..380e2a74 100644 --- a/wavefront/server/modules/tools_module/tools_module/available_tools.json +++ b/wavefront/server/modules/tools_module/tools_module/available_tools.json @@ -167,5 +167,17 @@ ], "required": [], "category": "message_processor" + }, + "trigger_api_service": { + "name": "trigger_api_service", + "description": "Execute an API service. This template gets expanded into individual tools for each API.", + "parameters": {}, + "prefill_values": [ + "api_service_id", + "api_id", + "api_version" + ], + "required": [], + "category": "api_service" } } diff --git a/wavefront/server/modules/tools_module/tools_module/registry/function_registry.py b/wavefront/server/modules/tools_module/tools_module/registry/function_registry.py index 4a7d776e..208c28d0 100644 --- a/wavefront/server/modules/tools_module/tools_module/registry/function_registry.py +++ b/wavefront/server/modules/tools_module/tools_module/registry/function_registry.py @@ -16,6 +16,9 @@ from tools_module.registry.registries.message_processor_registry import ( MESSAGE_PROCESSOR_REGISTRY, ) +from tools_module.registry.registries.api_service_registry import ( + API_SERVICE_REGISTRY, +) # TODO: Import other category registries as they are implemented @@ -41,6 +44,7 @@ def _merge_registries(*registries): EMAIL_REGISTRY, UTIL_FUNCTION_REGISTRY, MESSAGE_PROCESSOR_REGISTRY, + API_SERVICE_REGISTRY, ) diff --git a/wavefront/server/modules/tools_module/tools_module/registry/registries/api_service_registry.py b/wavefront/server/modules/tools_module/tools_module/registry/registries/api_service_registry.py new file mode 100644 index 00000000..958c828e --- /dev/null +++ b/wavefront/server/modules/tools_module/tools_module/registry/registries/api_service_registry.py @@ -0,0 +1,16 @@ +""" +API Service Tools Registry + +Contains the mapping from API service tool names to their execution function. +Since all API services use the same execution function (with different IDs), +this registry uses a single trigger function. +""" + +from tools_module.utils.api_service_fn import execute_api_service_fn + + +# For API services, we use a single execution function +# The actual service and API are selected via the api_service_id and api_id parameters +API_SERVICE_REGISTRY = { + 'trigger_api_service': execute_api_service_fn, +} diff --git a/wavefront/server/modules/tools_module/tools_module/tools_container.py b/wavefront/server/modules/tools_module/tools_module/tools_container.py index abf8f4ac..82cd4a6c 100644 --- a/wavefront/server/modules/tools_module/tools_module/tools_container.py +++ b/wavefront/server/modules/tools_module/tools_module/tools_container.py @@ -7,6 +7,7 @@ from tools_module.datasources.provider import DatasourceToolDetailsProvider from tools_module.knowlegebase.provider import KnowledgeBaseToolDetailsProvider from tools_module.message_processor.provider import MessageProcessorToolDetailsProvider +from tools_module.api_service.provider import ApiServiceToolDetailsProvider from tools_module.services.default_tool_provider import DefaultToolDetailsProvider @@ -17,6 +18,7 @@ class ToolsContainer(containers.DeclarativeContainer): knowledge_base_repository = providers.Dependency() knowledge_base_inference_repository = providers.Dependency() message_processor_repository = providers.Dependency() + api_services_manager = providers.Dependency() cloud_manager = providers.Dependency() message_processor_bucket_name = providers.Dependency() # Tool loader @@ -43,6 +45,11 @@ class ToolsContainer(containers.DeclarativeContainer): message_processor_bucket_name=message_processor_bucket_name, ) + api_service_tool_provider = providers.Singleton( + ApiServiceToolDetailsProvider, + api_services_manager=api_services_manager, + ) + default_tool_provider = providers.Singleton(DefaultToolDetailsProvider) # Tool service @@ -53,6 +60,7 @@ class ToolsContainer(containers.DeclarativeContainer): datasource_tool_provider, knowledge_base_tool_provider, message_processor_tool_provider, + api_service_tool_provider, default_tool_provider, ), ) diff --git a/wavefront/server/modules/tools_module/tools_module/utils/api_service_fn.py b/wavefront/server/modules/tools_module/tools_module/utils/api_service_fn.py index 6a437300..5aba6cf5 100644 --- a/wavefront/server/modules/tools_module/tools_module/utils/api_service_fn.py +++ b/wavefront/server/modules/tools_module/tools_module/utils/api_service_fn.py @@ -1,5 +1,4 @@ import json -from typing import Optional, Dict, Any from api_services_module.execution.execute import execute_api_service @@ -7,15 +6,45 @@ async def execute_api_service_fn( api_service_id: str, api_id: str, api_version: str = 'v1', - headers: Optional[Dict[str, str]] = None, - payload: Optional[Dict[str, Any]] = None, - query_params: Optional[Dict[str, Any]] = None, - path_params: Optional[Dict[str, Any]] = None, - variables: Optional[Dict[str, Any]] = None, + **kwargs, # Dynamic parameters for path_params, query_params, and payload ) -> str: - """Process a message using the message processor function""" + """ + Execute an API service function. - headers = headers or {} + Args: + api_service_id: ID of the API service (prefilled) + api_id: ID of the API endpoint (prefilled) + api_version: API version (prefilled, default: v1) + **kwargs: Dynamic parameters including: + - Payload schema fields (goes into payload) + - Path parameters (prefixed with 'path_', goes into path_params) + - Query parameters (prefixed with 'query_', goes into query_params) + + Returns: + Result from API service execution as string + """ + payload = {} + path_params = {} + query_params = {} + + # Extract special parameters if provided + headers = kwargs.pop('headers', None) + + # Distribute remaining kwargs based on naming convention + for key, value in kwargs.items(): + if key.startswith('path_'): + # Path parameters prefixed with 'path_' + path_params[key.replace('path_', '', 1)] = value + elif key.startswith('query_'): + # Query parameters prefixed with 'query_' + query_params[key.replace('query_', '', 1)] = value + else: + # Everything else goes to payload + payload[key] = value + + # Set default content-type + if headers is None: + headers = {} if not any(k.lower() == 'content-type' for k in headers.keys()): headers['content-type'] = 'application/json' @@ -23,9 +52,9 @@ async def execute_api_service_fn( api_service_id=api_service_id, api_id=api_id, api_version=api_version, - payload=payload, - query_params=query_params, - path_params=path_params, + payload=payload if payload else None, + query_params=query_params if query_params else None, + path_params=path_params if path_params else None, headers=headers, ) From b20defc094c166c05cc94c0e2d8cd3f4ab8c1037 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Mon, 29 Dec 2025 18:36:57 +0530 Subject: [PATCH 4/7] client/api_services - payload_schema support --- .../pages/apps/[appId]/api-services/[id].tsx | 178 +++++++++++++++++- 1 file changed, 177 insertions(+), 1 deletion(-) diff --git a/wavefront/client/src/pages/apps/[appId]/api-services/[id].tsx b/wavefront/client/src/pages/apps/[appId]/api-services/[id].tsx index e3bb279b..08c9ed1a 100644 --- a/wavefront/client/src/pages/apps/[appId]/api-services/[id].tsx +++ b/wavefront/client/src/pages/apps/[appId]/api-services/[id].tsx @@ -36,6 +36,13 @@ const keyValuePairSchema = z.object({ value: z.string(), }); +const payloadFieldSchema = z.object({ + name: z.string(), + type: z.enum(['string', 'integer', 'number', 'boolean', 'array', 'object']), + required: z.boolean(), + description: z.string(), +}); + const apiEndpointSchema = z.object({ id: z.string(), version: z.string(), @@ -46,6 +53,11 @@ const apiEndpointSchema = z.object({ backend_query_params: z.array(keyValuePairSchema), output_mapper_enabled: z.boolean(), output_mapper: z.array(keyValuePairSchema), + payload_schema: z + .object({ + fields: z.array(payloadFieldSchema), + }) + .optional(), }); const apiServiceFormSchema = z.object({ @@ -163,7 +175,7 @@ const ApiServiceDetail: React.FC = () => { }); } - return { + const result: any = { id: api.id, version: api.version, path: api.path, @@ -181,6 +193,24 @@ const ApiServiceDetail: React.FC = () => { output_mapper: outputMapperObj, }), }; + + // Add payload_schema for POST, PUT, PATCH methods + if ( + ['POST', 'PUT', 'PATCH'].includes(api.method) && + api.payload_schema?.fields && + api.payload_schema.fields.length > 0 + ) { + result.payload_schema = { + fields: api.payload_schema.fields.map((field) => ({ + name: field.name, + type: field.type, + required: field.required, + ...(field.description && { description: field.description }), + })), + }; + } + + return result; }), }, }; @@ -237,6 +267,17 @@ const ApiServiceDetail: React.FC = () => { value: String(value), })); + const payload_schema = api.payload_schema?.fields + ? { + fields: api.payload_schema.fields.map((field: any) => ({ + name: field.name || '', + type: field.type || 'string', + required: field.required || false, + description: field.description || '', + })), + } + : { fields: [] }; + return { id: api.id || '', version: api.version || 'v1', @@ -247,6 +288,7 @@ const ApiServiceDetail: React.FC = () => { backend_query_params: backend_query_params, output_mapper_enabled: api.output_mapper_enabled || false, output_mapper: output_mapper, + payload_schema: payload_schema, }; } ), @@ -299,6 +341,17 @@ const ApiServiceDetail: React.FC = () => { value: String(value), })); + const payload_schema = api.payload_schema?.fields + ? { + fields: api.payload_schema.fields.map((field: any) => ({ + name: field.name || '', + type: field.type || 'string', + required: field.required || false, + description: field.description || '', + })), + } + : { fields: [] }; + return { id: api.id, version: api.version, @@ -309,6 +362,7 @@ const ApiServiceDetail: React.FC = () => { backend_query_params: backend_query_params, output_mapper_enabled: api.output_mapper_enabled || false, output_mapper: output_mapper, + payload_schema: payload_schema, }; }), }; @@ -432,6 +486,9 @@ const ApiServiceDetail: React.FC = () => { backend_query_params: [], output_mapper_enabled: false, output_mapper: [], + payload_schema: { + fields: [], + }, }); }; @@ -478,6 +535,22 @@ const ApiServiceDetail: React.FC = () => { ); }; + const handleAddPayloadField = (apiIndex: number) => { + const currentFields = form.getValues(`apis.${apiIndex}.payload_schema.fields`) || []; + form.setValue(`apis.${apiIndex}.payload_schema.fields`, [ + ...currentFields, + { name: '', type: 'string', required: false, description: '' }, + ]); + }; + + const handleRemovePayloadField = (apiIndex: number, fieldIndex: number) => { + const currentFields = form.getValues(`apis.${apiIndex}.payload_schema.fields`) || []; + form.setValue( + `apis.${apiIndex}.payload_schema.fields`, + currentFields.filter((_, i) => i !== fieldIndex) + ); + }; + return (
@@ -1024,6 +1097,109 @@ const ApiServiceDetail: React.FC = () => {
+ {/* Payload Schema - Only for POST, PUT, PATCH */} + {['POST', 'PUT', 'PATCH'].includes(form.watch(`apis.${index}.method`)) && ( +
+
+ + +
+
+ {(form.watch(`apis.${index}.payload_schema.fields`) || []).map( + (_field: unknown, fIndex: number) => ( +
+ ( + + + + + + + )} + /> + ( + + + + + )} + /> + ( + + + + + Required + + )} + /> + ( + + + + + + + )} + /> + +
+ ) + )} +
+
+ )} + {/* Output Mapper */}
From 463749faaf8e29e12ce22ae2f9065525ee9ec3fc Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Tue, 30 Dec 2025 15:31:58 +0530 Subject: [PATCH 5/7] fixed comments --- .../services/agent_crud_service.py | 121 +---------- .../services/agent_inference_service.py | 120 +---------- .../api_services_module/config/parser.py | 4 +- .../tools_module/api_service/provider.py | 125 +----------- .../message_processor/provider.py | 4 +- .../utils/api_service_tool_loader.py | 189 ++++++++++++++++++ 6 files changed, 205 insertions(+), 358 deletions(-) create mode 100644 wavefront/server/modules/tools_module/tools_module/utils/api_service_tool_loader.py diff --git a/wavefront/server/modules/agents_module/agents_module/services/agent_crud_service.py b/wavefront/server/modules/agents_module/agents_module/services/agent_crud_service.py index 3a717a31..11962f27 100644 --- a/wavefront/server/modules/agents_module/agents_module/services/agent_crud_service.py +++ b/wavefront/server/modules/agents_module/agents_module/services/agent_crud_service.py @@ -1,6 +1,5 @@ import json import yaml -import re from typing import List, Optional from uuid import UUID @@ -21,8 +20,8 @@ from agents_module.utils.validation_utils import validate_agent_workflow_name from flo_ai import AgentBuilder from flo_ai.tool.base_tool import Tool +from tools_module.utils.api_service_tool_loader import load_api_service_tool from api_services_module.core.manager import ApiServicesManager -from api_services_module.config.parser import ServiceDefinitionParser class AgentCrudService: @@ -217,123 +216,7 @@ async def _try_load_api_service_tool(self, tool_name: str) -> Optional[Tool]: Returns: Tool object if API service found, None otherwise """ - from tools_module.utils.api_service_fn import execute_api_service_fn - - try: - # Check if api_services_manager is available - if not self.api_services_manager: - return None - - # Parse tool name to extract service_id and api_id - if '_' not in tool_name: - return None - - parts = tool_name.split('_', 1) - service_id = parts[0] - api_id = parts[1] - - # Query API service by id - service = await self.api_services_manager.get_api_service(id=service_id) - - if not service or not service.is_active: - return None - - # Load YAML content - yaml_content = self.api_services_manager.fetch_service_def(service) - - # Parse YAML to ServiceDefinition - service_def = ServiceDefinitionParser.parse_yaml_string(yaml_content) - - # Find the specific API config - api_config = service_def.get_api_by_id(api_id) - if not api_config: - return None - - # Build parameters dict for Tool - parameters = { - 'api_service_id': { - 'type': 'string', - 'description': 'ID of the API service (automatically filled)', - }, - 'api_id': { - 'type': 'string', - 'description': 'ID of the API endpoint (automatically filled)', - }, - 'api_version': { - 'type': 'string', - 'description': 'API version (automatically filled)', - }, - } - - # Extract path parameters from path template - path_params = self._extract_path_params(api_config.path) - for param_name in path_params: - parameters[f'path_{param_name}'] = { - 'type': 'string', - 'description': f'Path parameter: {param_name}', - } - - # Add query parameters from backend_query_params - for param_name, default_value in api_config.backend_query_params.items(): - param_type = self._infer_type(default_value) - parameters[f'query_{param_name}'] = { - 'type': param_type, - 'description': f'Query parameter: {param_name}', - } - - # Add payload schema fields - if api_config.payload_schema: - for field in api_config.payload_schema.fields: - parameters[field.name] = { - 'type': field.type, - 'description': field.description - or f'Payload field: {field.name}', - } - - # Build description - use API's description if available - if api_config.description: - description = api_config.description - else: - # Fallback to default description - description = f'Execute {service_def.id} API: {api_config.id}. Method: {api_config.method.value}.' - if api_config.payload_schema and api_config.payload_schema.fields: - description += f' Accepts {len(api_config.payload_schema.fields)} parameter(s).' - - # Create Tool object - tool = Tool( - name=tool_name, - description=description, - function=execute_api_service_fn, - parameters=parameters, - ) - - logger.info(f'Dynamically loaded API service tool: {tool_name}') - return tool - - except Exception as e: - logger.debug(f'API service {tool_name} not found: {str(e)}') - return None - - def _extract_path_params(self, path: str) -> list: - """Extract parameter names from path template.""" - pattern = r'\{([^}]+)\}' - matches = re.findall(pattern, path) - return matches - - def _infer_type(self, value) -> str: - """Infer JSON schema type from Python value.""" - if isinstance(value, bool): - return 'boolean' - elif isinstance(value, int): - return 'integer' - elif isinstance(value, float): - return 'number' - elif isinstance(value, list): - return 'array' - elif isinstance(value, dict): - return 'object' - else: - return 'string' + return await load_api_service_tool(tool_name, self.api_services_manager) async def create_agent( self, diff --git a/wavefront/server/modules/agents_module/agents_module/services/agent_inference_service.py b/wavefront/server/modules/agents_module/agents_module/services/agent_inference_service.py index 19f3fc1c..b8ffd603 100644 --- a/wavefront/server/modules/agents_module/agents_module/services/agent_inference_service.py +++ b/wavefront/server/modules/agents_module/agents_module/services/agent_inference_service.py @@ -14,11 +14,9 @@ from common_module.log.logger import logger from tools_module.registry.tool_loader import ToolLoader from tools_module.utils.message_processor_fn import execute_message_processor_fn -from tools_module.utils.api_service_fn import execute_api_service_fn +from tools_module.utils.api_service_tool_loader import load_api_service_tool from api_services_module.core.manager import ApiServicesManager -from api_services_module.config.parser import ServiceDefinitionParser import yaml -import re class AgentInferenceService: @@ -203,121 +201,7 @@ async def _try_load_api_service_tool(self, tool_name: str) -> Optional[Tool]: Returns: Tool object if API service found, None otherwise """ - try: - # Check if api_services_manager is available - if not self.api_services_manager: - return None - - # Parse tool name to extract service_id and api_id - if '_' not in tool_name: - return None - - parts = tool_name.split('_', 1) - service_id = parts[0] - api_id = parts[1] - - # Query API service by id - service = await self.api_services_manager.get_api_service(id=service_id) - - if not service or not service.is_active: - return None - - # Load YAML content - yaml_content = self.api_services_manager.fetch_service_def(service) - - # Parse YAML to ServiceDefinition - service_def = ServiceDefinitionParser.parse_yaml_string(yaml_content) - - # Find the specific API config - api_config = service_def.get_api_by_id(api_id) - if not api_config: - return None - - # Build parameters dict for Tool - parameters = { - 'api_service_id': { - 'type': 'string', - 'description': 'ID of the API service (automatically filled)', - }, - 'api_id': { - 'type': 'string', - 'description': 'ID of the API endpoint (automatically filled)', - }, - 'api_version': { - 'type': 'string', - 'description': 'API version (automatically filled)', - }, - } - - # Extract path parameters from path template - path_params = self._extract_path_params(api_config.path) - for param_name in path_params: - parameters[f'path_{param_name}'] = { - 'type': 'string', - 'description': f'Path parameter: {param_name}', - } - - # Add query parameters from backend_query_params - for param_name, default_value in api_config.backend_query_params.items(): - param_type = self._infer_type(default_value) - parameters[f'query_{param_name}'] = { - 'type': param_type, - 'description': f'Query parameter: {param_name}', - } - - # Add payload schema fields - if api_config.payload_schema: - for field in api_config.payload_schema.fields: - parameters[field.name] = { - 'type': field.type, - 'description': field.description - or f'Payload field: {field.name}', - } - - # Build description - use API's description if available - if api_config.description: - description = api_config.description - else: - # Fallback to default description - description = f'Execute {service_def.id} API: {api_config.id}. Method: {api_config.method.value}.' - if api_config.payload_schema and api_config.payload_schema.fields: - description += f' Accepts {len(api_config.payload_schema.fields)} parameter(s).' - - # Create Tool object - tool = Tool( - name=tool_name, - description=description, - function=execute_api_service_fn, - parameters=parameters, - ) - - logger.info(f'Dynamically loaded API service tool: {tool_name}') - return tool - - except Exception as e: - logger.debug(f'API service {tool_name} not found: {str(e)}') - return None - - def _extract_path_params(self, path: str) -> list: - """Extract parameter names from path template.""" - pattern = r'\{([^}]+)\}' - matches = re.findall(pattern, path) - return matches - - def _infer_type(self, value) -> str: - """Infer JSON schema type from Python value.""" - if isinstance(value, bool): - return 'boolean' - elif isinstance(value, int): - return 'integer' - elif isinstance(value, float): - return 'number' - elif isinstance(value, list): - return 'array' - elif isinstance(value, dict): - return 'object' - else: - return 'string' + return await load_api_service_tool(tool_name, self.api_services_manager) def _create_llm_instance(self, config: LlmInferenceConfig): """ diff --git a/wavefront/server/modules/api_services_module/api_services_module/config/parser.py b/wavefront/server/modules/api_services_module/api_services_module/config/parser.py index 84da0321..0d70616f 100644 --- a/wavefront/server/modules/api_services_module/api_services_module/config/parser.py +++ b/wavefront/server/modules/api_services_module/api_services_module/config/parser.py @@ -1,7 +1,7 @@ """YAML service definition parser.""" import yaml -from typing import Dict, Any, List +from typing import Dict, Any, List, Optional from ..models.service import ( ServiceDefinition, AuthConfig, @@ -109,7 +109,7 @@ def _parse_auth_config(auth_data: Dict[str, Any]) -> AuthConfig: return auth_config @staticmethod - def _parse_payload_schema(schema_data: Dict[str, Any]) -> PayloadSchema: + def _parse_payload_schema(schema_data: Dict[str, Any]) -> Optional[PayloadSchema]: """Parse payload schema configuration.""" if not schema_data or 'fields' not in schema_data: return None diff --git a/wavefront/server/modules/tools_module/tools_module/api_service/provider.py b/wavefront/server/modules/tools_module/tools_module/api_service/provider.py index f7e8d75f..9f444fd8 100644 --- a/wavefront/server/modules/tools_module/tools_module/api_service/provider.py +++ b/wavefront/server/modules/tools_module/tools_module/api_service/provider.py @@ -1,7 +1,11 @@ from typing import List, Dict, Any -import re from tools_module.interfaces.tool_details_provider import ToolDetailsProvider from tools_module.models.tool_schemas import ToolExecutionDetails +from tools_module.utils.api_service_tool_loader import ( + build_tool_parameters, + extract_required_params, + build_tool_description, +) from api_services_module.core.manager import ApiServicesManager from common_module.log.logger import logger from api_services_module.config.parser import ServiceDefinitionParser @@ -56,9 +60,9 @@ async def get_tool_details( tool_name = self._generate_tool_name( service_def.id, api_config.id ) - parameters = self._build_parameters(api_config) - required = self._extract_required_params(api_config) - description = self._build_description(service_def, api_config) + parameters = build_tool_parameters(api_config) + required = extract_required_params(api_config) + description = build_tool_description(service_def, api_config) tool_details.append( ToolExecutionDetails( @@ -103,116 +107,3 @@ def _generate_tool_name(self, service_id: str, api_id: str) -> str: # Use _ separator between service and API # Example: "my-api-service" + "get-user" → "my-api-service_get-user" return f'{service_id}_{api_id}' - - def _build_parameters(self, api_config) -> Dict[str, Any]: - """ - Build tool parameters from API configuration. - - Parameters include: - 1. Payload schema fields (from payload_schema) - 2. Path parameters (extracted from path template) - 3. Query parameters (from backend_query_params) - """ - parameters = {} - - # Add prefilled parameters (not shown to user but needed for execution) - parameters['api_service_id'] = { - 'type': 'string', - 'description': 'ID of the API service (automatically filled)', - } - parameters['api_id'] = { - 'type': 'string', - 'description': 'ID of the API endpoint (automatically filled)', - } - parameters['api_version'] = { - 'type': 'string', - 'description': 'API version (automatically filled)', - } - - # Extract path parameters from path template - path_params = self._extract_path_params(api_config.path) - for param_name in path_params: - parameters[f'path_{param_name}'] = { - 'type': 'string', - 'description': f'Path parameter: {param_name}', - } - - # Add query parameters from backend_query_params - for param_name, default_value in api_config.backend_query_params.items(): - param_type = self._infer_type(default_value) - parameters[f'query_{param_name}'] = { - 'type': param_type, - 'description': f'Query parameter: {param_name}', - } - - # Add payload schema fields - if api_config.payload_schema: - for field in api_config.payload_schema.fields: - parameters[field.name] = { - 'type': field.type, - 'description': field.description or f'Payload field: {field.name}', - } - - return parameters - - def _extract_required_params(self, api_config) -> List[str]: - """Extract required parameter names.""" - required = [] - - # Path parameters are always required - path_params = self._extract_path_params(api_config.path) - required.extend([f'path_{param}' for param in path_params]) - - # Required payload fields - if api_config.payload_schema: - for field in api_config.payload_schema.fields: - if field.required: - required.append(field.name) - - return required - - def _extract_path_params(self, path: str) -> List[str]: - """ - Extract parameter names from path template. - - Example: "/users/{user_id}/posts/{post_id}" → ["user_id", "post_id"] - """ - # Use regex to find all {param_name} patterns - pattern = r'\{([^}]+)\}' - matches = re.findall(pattern, path) - return matches - - def _infer_type(self, value: Any) -> str: - """Infer JSON schema type from Python value.""" - if isinstance(value, bool): - return 'boolean' - elif isinstance(value, int): - return 'integer' - elif isinstance(value, float): - return 'number' - elif isinstance(value, list): - return 'array' - elif isinstance(value, dict): - return 'object' - else: - return 'string' - - def _build_description(self, service_def, api_config) -> str: - """Build tool description from service and API metadata.""" - # Use the API's description if available, otherwise build a default description - if api_config.description: - return api_config.description - - # Fallback to default description - desc_parts = [ - f'Execute {service_def.id} API: {api_config.id}.', - f'Method: {api_config.method.value}.', - ] - - # Add API-specific description if available from payload schema - if api_config.payload_schema and api_config.payload_schema.fields: - desc_parts.append( - f'Accepts {len(api_config.payload_schema.fields)} parameter(s).' - ) - - return ' '.join(desc_parts) diff --git a/wavefront/server/modules/tools_module/tools_module/message_processor/provider.py b/wavefront/server/modules/tools_module/tools_module/message_processor/provider.py index e301518d..24ea7e09 100644 --- a/wavefront/server/modules/tools_module/tools_module/message_processor/provider.py +++ b/wavefront/server/modules/tools_module/tools_module/message_processor/provider.py @@ -45,7 +45,7 @@ async def get_tool_details( for processor in all_processors: try: # Load YAML content from cloud storage - yaml_content = await self._load_yaml_content(processor) + yaml_content = self._load_yaml_content(processor) # Parse YAML to extract schema yaml_dict = yaml.safe_load(yaml_content) @@ -92,7 +92,7 @@ async def get_tool_details( return tool_details - async def _load_yaml_content(self, processor: MessageProcessors) -> str: + def _load_yaml_content(self, processor: MessageProcessors) -> str: """Load YAML content from cloud storage""" filepath = f'{self.prefix}/{processor.source}' yaml_bytes = self.cloud_manager.read_file( diff --git a/wavefront/server/modules/tools_module/tools_module/utils/api_service_tool_loader.py b/wavefront/server/modules/tools_module/tools_module/utils/api_service_tool_loader.py new file mode 100644 index 00000000..536ae162 --- /dev/null +++ b/wavefront/server/modules/tools_module/tools_module/utils/api_service_tool_loader.py @@ -0,0 +1,189 @@ +"""Utility for dynamically loading API service tools.""" + +import re +from typing import Optional, Dict, Any, List +from flo_ai.tool.base_tool import Tool +from api_services_module.core.manager import ApiServicesManager +from api_services_module.config.parser import ServiceDefinitionParser +from common_module.log.logger import logger +from tools_module.utils.api_service_fn import execute_api_service_fn + + +def extract_path_params(path: str) -> List[str]: + """ + Extract parameter names from path template. + + Example: "/users/{user_id}/posts/{post_id}" → ["user_id", "post_id"] + """ + pattern = r'\{([^}]+)\}' + matches = re.findall(pattern, path) + return matches + + +def infer_type(value: Any) -> str: + """Infer JSON schema type from Python value.""" + if isinstance(value, bool): + return 'boolean' + elif isinstance(value, int): + return 'integer' + elif isinstance(value, float): + return 'number' + elif isinstance(value, list): + return 'array' + elif isinstance(value, dict): + return 'object' + else: + return 'string' + + +def build_tool_parameters(api_config) -> Dict[str, Any]: + """ + Build tool parameters from API configuration. + + Parameters include: + 1. Payload schema fields (from payload_schema) + 2. Path parameters (extracted from path template) + 3. Query parameters (from backend_query_params) + """ + parameters = {} + + # Add prefilled parameters (not shown to user but needed for execution) + parameters['api_service_id'] = { + 'type': 'string', + 'description': 'ID of the API service (automatically filled)', + } + parameters['api_id'] = { + 'type': 'string', + 'description': 'ID of the API endpoint (automatically filled)', + } + parameters['api_version'] = { + 'type': 'string', + 'description': 'API version (automatically filled)', + } + + # Extract path parameters from path template + path_params = extract_path_params(api_config.path) + for param_name in path_params: + parameters[f'path_{param_name}'] = { + 'type': 'string', + 'description': f'Path parameter: {param_name}', + } + + # Add query parameters from backend_query_params + for param_name, default_value in api_config.backend_query_params.items(): + param_type = infer_type(default_value) + parameters[f'query_{param_name}'] = { + 'type': param_type, + 'description': f'Query parameter: {param_name}', + } + + # Add payload schema fields + if api_config.payload_schema: + for field in api_config.payload_schema.fields: + parameters[field.name] = { + 'type': field.type, + 'description': field.description or f'Payload field: {field.name}', + } + + return parameters + + +def extract_required_params(api_config) -> List[str]: + """Extract required parameter names from API configuration.""" + required = [] + + # Path parameters are always required + path_params = extract_path_params(api_config.path) + required.extend([f'path_{param}' for param in path_params]) + + # Required payload fields + if api_config.payload_schema: + for field in api_config.payload_schema.fields: + if field.required: + required.append(field.name) + + return required + + +def build_tool_description(service_def, api_config) -> str: + """Build tool description from service and API metadata.""" + # Use the API's description if available, otherwise build a default description + if api_config.description: + return api_config.description + + # Fallback to default description + desc_parts = [ + f'Execute {service_def.id} API: {api_config.id}.', + f'Method: {api_config.method.value}.', + ] + + # Add API-specific description if available from payload schema + if api_config.payload_schema and api_config.payload_schema.fields: + desc_parts.append( + f'Accepts {len(api_config.payload_schema.fields)} parameter(s).' + ) + + return ' '.join(desc_parts) + + +async def load_api_service_tool( + tool_name: str, api_services_manager: Optional[ApiServicesManager] +) -> Optional[Tool]: + """ + Attempt to load an API service as a Tool object. + + Args: + tool_name: Name of the tool in format "service_id_api_id" + api_services_manager: API services manager instance + + Returns: + Tool object if API service found, None otherwise + """ + try: + # Check if api_services_manager is available + if not api_services_manager: + return None + + # Parse tool name to extract service_id and api_id + if '_' not in tool_name: + return None + + parts = tool_name.split('_', 1) + service_id = parts[0] + api_id = parts[1] + + # Query API service by id + service = await api_services_manager.get_api_service(id=service_id) + + if not service or not service.is_active: + return None + + # Load YAML content + yaml_content = api_services_manager.fetch_service_def(service) + + # Parse YAML to ServiceDefinition + service_def = ServiceDefinitionParser.parse_yaml_string(yaml_content) + + # Find the specific API config + api_config = service_def.get_api_by_id(api_id) + if not api_config: + return None + + # Build parameters, description using shared helpers + parameters = build_tool_parameters(api_config) + description = build_tool_description(service_def, api_config) + + # Create Tool object + tool = Tool( + name=tool_name, + description=description, + function=execute_api_service_fn, + parameters=parameters, + ) + + logger.info(f'Dynamically loaded API service tool: {tool_name}') + return tool + + except Exception as e: + logger.debug(f'API service {tool_name} not found: {str(e)}') + return None From 4cc974109757dc356af3be5ad6dca1fe6a95b800 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Tue, 30 Dec 2025 16:05:30 +0530 Subject: [PATCH 6/7] api_services/description field show --- .../api-services/CreateApiServiceDialog.tsx | 1 + .../pages/apps/[appId]/api-services/[id].tsx | 26 +++++++++++++++++++ .../api_services_module/core/proxy.py | 1 + 3 files changed, 28 insertions(+) diff --git a/wavefront/client/src/pages/apps/[appId]/api-services/CreateApiServiceDialog.tsx b/wavefront/client/src/pages/apps/[appId]/api-services/CreateApiServiceDialog.tsx index d2c3ed4f..8d0b9784 100644 --- a/wavefront/client/src/pages/apps/[appId]/api-services/CreateApiServiceDialog.tsx +++ b/wavefront/client/src/pages/apps/[appId]/api-services/CreateApiServiceDialog.tsx @@ -48,6 +48,7 @@ const defaultYamlContent = `service: path: /users backend_path: /api/users method: GET + description: get users api output_mapper_enabled: false `; diff --git a/wavefront/client/src/pages/apps/[appId]/api-services/[id].tsx b/wavefront/client/src/pages/apps/[appId]/api-services/[id].tsx index 08c9ed1a..84b218c1 100644 --- a/wavefront/client/src/pages/apps/[appId]/api-services/[id].tsx +++ b/wavefront/client/src/pages/apps/[appId]/api-services/[id].tsx @@ -49,6 +49,7 @@ const apiEndpointSchema = z.object({ path: z.string(), backend_path: z.string(), method: z.enum(['GET', 'POST', 'PUT', 'DELETE', 'PATCH']), + description: z.string().optional(), additional_headers: z.array(keyValuePairSchema), backend_query_params: z.array(keyValuePairSchema), output_mapper_enabled: z.boolean(), @@ -181,6 +182,7 @@ const ApiServiceDetail: React.FC = () => { path: api.path, backend_path: api.backend_path, method: api.method, + ...(api.description && { description: api.description }), ...(Object.keys(apiHeadersObj).length > 0 && { additional_headers: apiHeadersObj, }), @@ -284,6 +286,7 @@ const ApiServiceDetail: React.FC = () => { path: api.path || '', backend_path: api.backend_path || '', method: (api.method as 'GET' | 'POST' | 'PUT' | 'DELETE' | 'PATCH') || 'GET', + description: api.description || '', additional_headers: api_additional_headers, backend_query_params: backend_query_params, output_mapper_enabled: api.output_mapper_enabled || false, @@ -358,6 +361,7 @@ const ApiServiceDetail: React.FC = () => { path: api.path, backend_path: api.backend_path || '', method: api.method as 'GET' | 'POST' | 'PUT' | 'DELETE' | 'PATCH', + description: api.description || '', additional_headers: api_additional_headers, backend_query_params: backend_query_params, output_mapper_enabled: api.output_mapper_enabled || false, @@ -482,6 +486,7 @@ const ApiServiceDetail: React.FC = () => { path: '', backend_path: '', method: 'GET', + description: '', additional_headers: [], backend_query_params: [], output_mapper_enabled: false, @@ -983,6 +988,27 @@ const ApiServiceDetail: React.FC = () => { />
+ {/* API Description */} +
+ ( + + Description (Optional) + + + + + + )} + /> +
+ {/* API Additional Headers */}
diff --git a/wavefront/server/modules/api_services_module/api_services_module/core/proxy.py b/wavefront/server/modules/api_services_module/api_services_module/core/proxy.py index 1a34fb3c..5e85373e 100644 --- a/wavefront/server/modules/api_services_module/api_services_module/core/proxy.py +++ b/wavefront/server/modules/api_services_module/api_services_module/core/proxy.py @@ -310,6 +310,7 @@ def get_service_info(self, service_id: str) -> Dict[str, Any]: 'path': api.path, 'method': api.method.value, 'backend_path': api.backend_path, + 'description': api.description, 'additional_headers': api.additional_headers, 'backend_query_params': api.backend_query_params, 'output_mapper_enabled': api.output_mapper_enabled, From c349e4b4666bf1addc23afe7782f1715c55c09d6 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Mon, 5 Jan 2026 11:47:47 +0530 Subject: [PATCH 7/7] upgraded flo-ai version --- flo_ai/pyproject.toml | 2 +- flo_ai/uv.lock | 4 ++-- .../server/modules/agents_module/pyproject.toml | 2 +- .../modules/knowledge_base_module/pyproject.toml | 2 +- wavefront/server/modules/tools_module/pyproject.toml | 2 +- wavefront/server/uv.lock | 12 ++++++------ 6 files changed, 12 insertions(+), 12 deletions(-) diff --git a/flo_ai/pyproject.toml b/flo_ai/pyproject.toml index 928575e4..2b976988 100644 --- a/flo_ai/pyproject.toml +++ b/flo_ai/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "flo_ai" -version = "1.1.0-rc6" +version = "1.1.0-rc7" description = "A easy way to create structured AI agents" authors = [{ name = "rootflo", email = "engineering.tools@rootflo.ai" }] requires-python = ">=3.10,<4.0" diff --git a/flo_ai/uv.lock b/flo_ai/uv.lock index 076ab59e..f9b039b9 100644 --- a/flo_ai/uv.lock +++ b/flo_ai/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.10, <4.0" resolution-markers = [ "python_full_version >= '3.14'", @@ -897,7 +897,7 @@ wheels = [ [[package]] name = "flo-ai" -version = "1.1.0rc6" +version = "1.1.0rc7" source = { editable = "." } dependencies = [ { name = "aiohttp" }, diff --git a/wavefront/server/modules/agents_module/pyproject.toml b/wavefront/server/modules/agents_module/pyproject.toml index 63019d89..e22b9858 100644 --- a/wavefront/server/modules/agents_module/pyproject.toml +++ b/wavefront/server/modules/agents_module/pyproject.toml @@ -13,7 +13,7 @@ dependencies = [ "flo-utils", "tools-module", "api-services-module", - "flo-ai==1.1.0-rc6", + "flo-ai==1.1.0-rc7", ] [tool.uv.sources] diff --git a/wavefront/server/modules/knowledge_base_module/pyproject.toml b/wavefront/server/modules/knowledge_base_module/pyproject.toml index 1368c711..43804159 100644 --- a/wavefront/server/modules/knowledge_base_module/pyproject.toml +++ b/wavefront/server/modules/knowledge_base_module/pyproject.toml @@ -17,7 +17,7 @@ dependencies = [ "pandas~=2.2.3", "ollama~=0.4.8", "textract~=1.6.5", - "flo-ai==1.1.0-rc6", + "flo-ai==1.1.0-rc7", "google-cloud-pubsub~=2.30.0", "boto3<=1.38.40", "pyyaml>=6.0.3,<7", diff --git a/wavefront/server/modules/tools_module/pyproject.toml b/wavefront/server/modules/tools_module/pyproject.toml index 179ff579..39e979c2 100644 --- a/wavefront/server/modules/tools_module/pyproject.toml +++ b/wavefront/server/modules/tools_module/pyproject.toml @@ -3,7 +3,7 @@ name = "tools_module" version = "0.1.0" description = "Tools module for Flo AI agent system" dependencies = [ - "flo-ai==1.1.0-rc6", + "flo-ai==1.1.0-rc7", "flo_cloud", "datasource", diff --git a/wavefront/server/uv.lock b/wavefront/server/uv.lock index eb2a04d7..169b3242 100644 --- a/wavefront/server/uv.lock +++ b/wavefront/server/uv.lock @@ -90,7 +90,7 @@ dependencies = [ requires-dist = [ { name = "api-services-module", editable = "modules/api_services_module" }, { name = "common-module", editable = "modules/common_module" }, - { name = "flo-ai", specifier = "==1.1.0rc6" }, + { name = "flo-ai", specifier = "==1.1.0rc7" }, { name = "flo-cloud", editable = "packages/flo_cloud" }, { name = "flo-utils", editable = "packages/flo_utils" }, { name = "tools-module", editable = "modules/tools_module" }, @@ -1356,7 +1356,7 @@ wheels = [ [[package]] name = "flo-ai" -version = "1.1.0rc6" +version = "1.1.0rc7" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "aiohttp" }, @@ -1377,9 +1377,9 @@ dependencies = [ { name = "pypdf" }, { name = "pyyaml" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/28/43/16a20cacc46837d0a754df706d4e7ad91c07414617f35a947a143924538a/flo_ai-1.1.0rc6.tar.gz", hash = "sha256:07eeed7b7ed5ce283c72164df311480f529cdb28f304b16e63dd054d6ee9177b", size = 89942, upload-time = "2025-12-13T12:52:58.197Z" } +sdist = { url = "https://files.pythonhosted.org/packages/ca/ec/aed7bb08ee0112ced0094f9bf4e2824ea202d099a4f52927e9166d79b46a/flo_ai-1.1.0rc7.tar.gz", hash = "sha256:1e48cd48b98b0de6d4087dd4774c79e181310bb7430664ce3b3d81615a4d0d69", size = 89293, upload-time = "2026-01-05T06:11:47.252Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/f5/1e/3ef796ef1cf45235e68792731b832eb89f58b10e134597296f84001b1a7d/flo_ai-1.1.0rc6-py3-none-any.whl", hash = "sha256:2c47dc341930ea93a0a7a61d7d59a6e30068063742ff4b8052805dc4fb062bb1", size = 114499, upload-time = "2025-12-13T12:52:56.293Z" }, + { url = "https://files.pythonhosted.org/packages/26/38/46e80522f15f7398c0dc3a52850a0b5f3a06d7a0d9bd515569617a103472/flo_ai-1.1.0rc7-py3-none-any.whl", hash = "sha256:b7f54c057b6cea2b236ccb774244ebc64496836967316d73dd7ff47e4f017e8c", size = 113898, upload-time = "2026-01-05T06:11:57.557Z" }, ] [[package]] @@ -2576,7 +2576,7 @@ dev = [ requires-dist = [ { name = "boto3", specifier = "<=1.38.40" }, { name = "datasource", editable = "plugins/datasource" }, - { name = "flo-ai", specifier = "==1.1.0rc6" }, + { name = "flo-ai", specifier = "==1.1.0rc7" }, { name = "flo-cloud", editable = "packages/flo_cloud" }, { name = "google-cloud-pubsub", specifier = "~=2.30.0" }, { name = "numpy", specifier = ">=1.24,<2.0" }, @@ -5482,7 +5482,7 @@ dev = [ requires-dist = [ { name = "common-module", editable = "modules/common_module" }, { name = "datasource", editable = "plugins/datasource" }, - { name = "flo-ai", specifier = "==1.1.0rc6" }, + { name = "flo-ai", specifier = "==1.1.0rc7" }, { name = "flo-cloud", editable = "packages/flo_cloud" }, { name = "knowledge-base-module", editable = "modules/knowledge_base_module" }, { name = "plugins-module", editable = "modules/plugins_module" },