diff --git a/flo_ai/flo_ai/arium/arium.py b/flo_ai/flo_ai/arium/arium.py index d7d0fd62..f88bbeba 100644 --- a/flo_ai/flo_ai/arium/arium.py +++ b/flo_ai/flo_ai/arium/arium.py @@ -507,6 +507,7 @@ async def _execute_node( node_name=node.name, node_type=node_type, execution_time=execution_time, + node_output=self._serialize_node_output(result), ) return result @@ -561,6 +562,7 @@ async def _execute_node( node_name=node.name, node_type=node_type, execution_time=execution_time, + node_output=self._serialize_node_output(result), ) return result @@ -638,3 +640,25 @@ def _add_to_memory(self, message: MessageMemoryItem): Store message in memory """ self.memory.add(message) + + def _serialize_node_output(self, result: Any) -> Optional[str]: + if result is None: + return None + if isinstance(result, str): + return result + if isinstance(result, list): + parts = [self._serialize_node_output(item) for item in result] + return '\n'.join(p for p in parts if p) or None + if hasattr(result, 'content'): + return self._serialize_node_output(result.content) + if hasattr(result, 'text'): + return result.text + # DocumentMessageContent / ImageMessageContent — show url or type label + media_type = getattr(result, 'type', None) + if media_type in ('document', 'image'): + url = getattr(result, 'url', None) + mime = getattr(result, 'mime_type', None) + if url: + return f'[{media_type}: {url}]' + return f'[{media_type}{f": {mime}" if mime else ""}]' + return str(result) diff --git a/flo_ai/flo_ai/arium/events.py b/flo_ai/flo_ai/arium/events.py index 44dee2d7..30f10016 100644 --- a/flo_ai/flo_ai/arium/events.py +++ b/flo_ai/flo_ai/arium/events.py @@ -48,6 +48,7 @@ class AriumEvent: execution_time: Optional[float] = None error: Optional[str] = None router_choice: Optional[str] = None + node_output: Optional[str] = None metadata: Optional[dict] = None diff --git a/wavefront/client/src/components/InferencePopup.tsx b/wavefront/client/src/components/InferencePopup.tsx index f3da32dc..768f618d 100644 --- a/wavefront/client/src/components/InferencePopup.tsx +++ b/wavefront/client/src/components/InferencePopup.tsx @@ -254,6 +254,7 @@ const InferencePopup: React.FC = ({ onClose, renderModal = document_type: doc.documentType, document_base64: doc.base64Content, mime_type: doc.mimeType, + file_name: doc.file.name, metadata: { filename: doc.file.name, size: doc.file.size, diff --git a/wavefront/client/src/components/Stream.tsx b/wavefront/client/src/components/Stream.tsx index c75a3045..6ea8328c 100644 --- a/wavefront/client/src/components/Stream.tsx +++ b/wavefront/client/src/components/Stream.tsx @@ -10,6 +10,7 @@ interface StreamProps { execution_time?: number; error?: string; router_choice?: string; + node_output?: string; }>; isStreaming?: boolean; eventsContainerRef?: RefObject; @@ -74,6 +75,12 @@ const Stream: React.FC = ({ listenEventsEnabled, streamingEvents, i {'router_choice' in event && event.router_choice && (
Router choice: {event.router_choice}
)} + {'node_output' in event && event.node_output && ( +
+ Output: + {event.node_output} +
+ )} ))} diff --git a/wavefront/client/src/pages/apps/[appId]/agents/[id].tsx b/wavefront/client/src/pages/apps/[appId]/agents/[id].tsx index ce61394d..8da647b4 100644 --- a/wavefront/client/src/pages/apps/[appId]/agents/[id].tsx +++ b/wavefront/client/src/pages/apps/[appId]/agents/[id].tsx @@ -432,6 +432,7 @@ const AgentDetail: React.FC = () => { document_type: doc.documentType, document_base64: doc.base64Content, mime_type: doc.mimeType, + file_name: doc.file.name, metadata: { filename: doc.file.name, size: doc.file.size, diff --git a/wavefront/client/src/pages/apps/[appId]/workflows/[id].tsx b/wavefront/client/src/pages/apps/[appId]/workflows/[id].tsx index 30f71385..61ec711a 100644 --- a/wavefront/client/src/pages/apps/[appId]/workflows/[id].tsx +++ b/wavefront/client/src/pages/apps/[appId]/workflows/[id].tsx @@ -391,6 +391,7 @@ const WorkflowDetail: React.FC = () => { document_type: doc.documentType, document_base64: doc.base64Content, mime_type: doc.mimeType, + file_name: doc.file.name, metadata: { filename: doc.file.name, size: doc.file.size, diff --git a/wavefront/client/src/types/chat-message.ts b/wavefront/client/src/types/chat-message.ts index e80dbf36..ce93846a 100644 --- a/wavefront/client/src/types/chat-message.ts +++ b/wavefront/client/src/types/chat-message.ts @@ -8,6 +8,7 @@ export interface DocumentContent { document_type: string; document_base64?: string; mime_type?: string; + file_name?: string; metadata?: { filename?: string; size?: number; diff --git a/wavefront/client/src/types/workflow.ts b/wavefront/client/src/types/workflow.ts index a03f2b68..af422497 100644 --- a/wavefront/client/src/types/workflow.ts +++ b/wavefront/client/src/types/workflow.ts @@ -79,6 +79,7 @@ export interface WorkflowEventBase { execution_time?: number; error?: string; router_choice?: string; + node_output?: string; metadata?: Record; } @@ -105,6 +106,7 @@ export interface NodeCompletedEvent extends WorkflowEventBase { event_type: 'node_completed'; node_name: string; execution_time: number; + node_output?: string; } export interface NodeFailedEvent extends WorkflowEventBase { diff --git a/wavefront/server/apps/floconsole/floconsole/di/application_container.py b/wavefront/server/apps/floconsole/floconsole/di/application_container.py index 8a74f040..54e4a9d8 100644 --- a/wavefront/server/apps/floconsole/floconsole/di/application_container.py +++ b/wavefront/server/apps/floconsole/floconsole/di/application_container.py @@ -66,8 +66,12 @@ class ApplicationContainer(containers.DeclarativeContainer): app_user_repository=app_user_repository, ) - kms_service = providers.Singleton( - FloKmsService, cloud_provider=config.cloud_config.cloud_provider + kms_service = providers.Selector( + config.jwt_token.enable_cloud_kms, + true=providers.Singleton( + FloKmsService, cloud_provider=config.cloud_config.cloud_provider + ), + false=providers.Object(None), ) token_service = providers.Singleton( diff --git a/wavefront/server/modules/agents_module/agents_module/controllers/workflow_controller.py b/wavefront/server/modules/agents_module/agents_module/controllers/workflow_controller.py index 6c4c0292..456bde0d 100644 --- a/wavefront/server/modules/agents_module/agents_module/controllers/workflow_controller.py +++ b/wavefront/server/modules/agents_module/agents_module/controllers/workflow_controller.py @@ -6,6 +6,8 @@ from dependency_injector.wiring import inject, Provide import json import asyncio +import uuid +import time from common_module.log.logger import logger from common_module.response_formatter import ResponseFormatter @@ -75,9 +77,6 @@ async def workflow_inference( f'Starting inference for namespace: {namespace}, workflow_id: {workflow_id}, listen_events: {listen_events}' ) - # Extract user_id from authenticated session - user_id = request.state.session.user_id - # Extract authentication credentials access_token, app_key = extract_auth_credentials(request) @@ -89,27 +88,26 @@ async def workflow_inference( events_filter = None if listen_events or request_body.listen_events: - event_callback = create_workflow_event_callback(user_id, namespace, workflow_id) + execution_id = str(uuid.uuid4()) + event_callback = create_workflow_event_callback( + execution_id, namespace, workflow_id + ) events_filter = DEFAULT_EVENTS_FILTER logger.info( - f'Event streaming enabled for user {user_id}, workflow {namespace}/{workflow_id}' + f'Event streaming enabled for execution {execution_id}, workflow {namespace}/{workflow_id}' ) # Check if streaming is requested if listen_events or request_body.listen_events: logger.info( - f'Streaming inference for user {user_id}, workflow {namespace}/{workflow_id}' + f'Streaming inference for execution {execution_id}, workflow {namespace}/{workflow_id}' ) - # Get or create event queue for this user-workflow - event_queue = event_streamer.get_or_create_queue( - user_id, namespace, workflow_id - ) + event_queue = event_streamer.create_queue(execution_id) async def generate_inference_stream(): """Generate streaming inference with events and final output""" try: - # Start inference in background task inference_task = asyncio.create_task( workflow_inference_service.perform_inference( workflow_name=workflow_id, @@ -126,60 +124,54 @@ async def generate_inference_stream(): ) ) - # Stream events while workflow is running - workflow_completed = False - while not workflow_completed and not inference_task.done(): + # Stream events until inference completes + while not inference_task.done(): try: - # Wait for event with timeout event_data = await asyncio.wait_for( event_queue.get(), timeout=1.0 ) yield f'data: {json.dumps(event_data)}\n\n' - await asyncio.sleep(0.1) # remove it later - - # Check if workflow ended - if event_data.get('event_type') in [ - 'workflow_completed', - 'workflow_failed', - ]: - workflow_completed = True - except asyncio.TimeoutError: - # Continue waiting if no events continue - # Wait for inference to complete and get result + # Yield to the event loop so any ensure_future(add_event(...)) + # callbacks scheduled inside the inference task have a chance + # to run and enqueue their events before we drain. + await asyncio.sleep(0) + + # Drain any remaining events queued after task completion + while not event_queue.empty(): + event_data = event_queue.get_nowait() + yield f'data: {json.dumps(event_data)}\n\n' + result, execution_time = await inference_task - # Send final output event output_event = { 'event_type': 'output', 'result': result, 'workflow_id': workflow_id, 'namespace': namespace, 'execution_time': execution_time, - 'timestamp': asyncio.get_event_loop().time(), + 'timestamp': time.time(), } yield f'data: {json.dumps(output_event)}\n\n' - await asyncio.sleep(0.1) # remove it later logger.info( - f'Streaming inference completed for user {user_id}, workflow {namespace}/{workflow_id}' + f'Streaming inference completed for execution {execution_id}, workflow {namespace}/{workflow_id}' ) except Exception as e: logger.error( - f'Error in streaming inference for user {user_id}, workflow {namespace}/{workflow_id}: {e}' + f'Error in streaming inference for execution {execution_id}, workflow {namespace}/{workflow_id}: {e}' ) error_event = { 'event_type': 'error', 'error': str(e), - 'timestamp': asyncio.get_event_loop().time(), + 'timestamp': time.time(), } yield f'data: {json.dumps(error_event)}\n\n' finally: - # Clean up queue - event_streamer.cleanup_queue(user_id, namespace, workflow_id) + event_streamer.cleanup_queue(execution_id) return StreamingResponse( generate_inference_stream(), @@ -278,10 +270,6 @@ async def workflow_inference_v2( logger.info( f'Starting v2 inference for workflow_id: {workflow_id}, listen_events: {listen_events}' ) - - # Extract user_id from authenticated session - user_id = request.state.session.user_id - # Extract authentication credentials access_token, app_key = extract_auth_credentials(request) @@ -298,30 +286,26 @@ async def workflow_inference_v2( events_filter = None if listen_events or request_body.listen_events: - # Use real namespace and workflow name for event streaming + execution_id = str(uuid.uuid4()) event_callback = create_workflow_event_callback( - user_id, namespace, workflow_name + execution_id, namespace, workflow_name ) events_filter = DEFAULT_EVENTS_FILTER logger.info( - f'Event streaming enabled for user {user_id}, workflow {namespace}/{workflow_name}' + f'Event streaming enabled for execution {execution_id}, workflow {namespace}/{workflow_name}' ) # Check if streaming is requested if listen_events or request_body.listen_events: logger.info( - f'Streaming inference for user {user_id}, workflow {namespace}/{workflow_name}' + f'Streaming inference for execution {execution_id}, workflow {namespace}/{workflow_name}' ) - # Get or create event queue for this user-workflow - event_queue = event_streamer.get_or_create_queue( - user_id, namespace, workflow_name - ) + event_queue = event_streamer.create_queue(execution_id) async def generate_inference_stream(): """Generate streaming inference with events and final output""" try: - # Start inference in background task inference_task = asyncio.create_task( workflow_inference_service.perform_inference_v2( workflow_data=workflow_data, @@ -337,66 +321,54 @@ async def generate_inference_stream(): ) ) - # Stream events while workflow is running - workflow_completed = False - while not workflow_completed and not inference_task.done(): + # Stream events until inference completes + while not inference_task.done(): try: - # Wait for event with timeout event_data = await asyncio.wait_for( event_queue.get(), timeout=1.0 ) yield f'data: {json.dumps(event_data)}\n\n' - await asyncio.sleep(0.1) # remove it later - - # Check if workflow ended - if event_data.get('event_type') in [ - 'workflow_completed', - 'workflow_failed', - ]: - workflow_completed = True - except asyncio.TimeoutError: - # Continue waiting if no events continue - # Wait for inference to complete and get result + # Yield to the event loop so any ensure_future(add_event(...)) + # callbacks scheduled inside the inference task have a chance + # to run and enqueue their events before we drain. + await asyncio.sleep(0) + + # Drain any remaining events queued after task completion + while not event_queue.empty(): + event_data = event_queue.get_nowait() + yield f'data: {json.dumps(event_data)}\n\n' + result, execution_time = await inference_task - # Send final output event output_event = { 'event_type': 'output', 'result': result, 'workflow_id': workflow_name, 'namespace': namespace, 'execution_time': execution_time, - 'timestamp': asyncio.get_event_loop().time(), + 'timestamp': time.time(), } yield f'data: {json.dumps(output_event)}\n\n' - await asyncio.sleep(0.1) # remove it later logger.info( - f'Streaming inference completed for user {user_id}, workflow {namespace}/{workflow_name}' + f'Streaming inference completed for execution {execution_id}, workflow {namespace}/{workflow_name}' ) - except ValueError as e: - logger.error(f'Error in streaming inference: {e}') - error_event = { - 'event_type': 'error', - 'error': str(e), - 'timestamp': asyncio.get_event_loop().time(), - } - yield f'data: {json.dumps(error_event)}\n\n' except Exception as e: - logger.error(f'Error in streaming inference: {e}') + logger.error( + f'Error in streaming inference for execution {execution_id}: {e}' + ) error_event = { 'event_type': 'error', 'error': str(e), - 'timestamp': asyncio.get_event_loop().time(), + 'timestamp': time.time(), } yield f'data: {json.dumps(error_event)}\n\n' finally: - # Clean up queue - event_streamer.cleanup_queue(user_id, namespace, workflow_name) + event_streamer.cleanup_queue(execution_id) return StreamingResponse( generate_inference_stream(), diff --git a/wavefront/server/modules/agents_module/agents_module/models/workflow_schemas.py b/wavefront/server/modules/agents_module/agents_module/models/workflow_schemas.py index d48fc8b6..0d56a8dd 100644 --- a/wavefront/server/modules/agents_module/agents_module/models/workflow_schemas.py +++ b/wavefront/server/modules/agents_module/agents_module/models/workflow_schemas.py @@ -101,6 +101,9 @@ class WorkflowEventMessage(BaseModel): router_choice: Optional[str] = Field( None, description='Node chosen by router decision' ) + node_output: Optional[str] = Field( + None, description='Output produced by the node upon completion' + ) metadata: Optional[Dict[str, Any]] = Field( None, description='Additional event-specific data' ) diff --git a/wavefront/server/modules/agents_module/agents_module/services/workflow_events.py b/wavefront/server/modules/agents_module/agents_module/services/workflow_events.py index ad185e42..c55ac5e5 100644 --- a/wavefront/server/modules/agents_module/agents_module/services/workflow_events.py +++ b/wavefront/server/modules/agents_module/agents_module/services/workflow_events.py @@ -6,75 +6,36 @@ class WorkflowEventStreamer: - """Manager for HTTP streaming workflow events with user isolation using asyncio.Queue""" + """Manager for HTTP streaming workflow events, isolated per execution.""" def __init__(self): - # Store event queues by user-specific workflow key (user_id_namespace_workflow_id) self.event_queues: Dict[str, asyncio.Queue] = {} - def get_workflow_key(self, user_id: str, namespace: str, workflow_id: str) -> str: - """Generate unique key for user-specific workflow""" - return f'{user_id}_{namespace}_{workflow_id}' - - def get_or_create_queue( - self, user_id: str, namespace: str, workflow_id: str - ) -> asyncio.Queue: - """Get or create event queue for user-specific workflow""" - workflow_key = self.get_workflow_key(user_id, namespace, workflow_id) - - if workflow_key not in self.event_queues: - self.event_queues[workflow_key] = asyncio.Queue() - logger.info( - f'Created event queue for user {user_id}, workflow {namespace}/{workflow_id}' - ) - - return self.event_queues[workflow_key] - - async def add_event( - self, - user_id: str, - namespace: str, - workflow_id: str, - event_message: WorkflowEventMessage, - ): - """Add event to the queue for user-specific workflow""" - workflow_key = self.get_workflow_key(user_id, namespace, workflow_id) - - if workflow_key not in self.event_queues: - # Create queue if it doesn't exist (workflow started before streaming) - self.event_queues[workflow_key] = asyncio.Queue() - logger.info( - f'Created event queue for user {user_id}, workflow {namespace}/{workflow_id}' - ) + def create_queue(self, execution_id: str) -> asyncio.Queue: + queue: asyncio.Queue = asyncio.Queue() + self.event_queues[execution_id] = queue + logger.info(f'Created event queue for execution {execution_id}') + return queue + async def add_event(self, execution_id: str, event_message: WorkflowEventMessage): + queue = self.event_queues.get(execution_id) + if queue is None: + return try: - # Convert event message to dict for JSON serialization - event_dict = event_message.model_dump() - await self.event_queues[workflow_key].put(event_dict) - logger.debug( - f"Event queued for user {user_id}, workflow {namespace}/{workflow_id}: {event_dict['event_type']}" - ) + await queue.put(event_message.model_dump()) except Exception as e: - logger.error( - f'Error queuing event for user {user_id}, workflow {namespace}/{workflow_id}: {e}' - ) + logger.error(f'Error queuing event for execution {execution_id}: {e}') - def cleanup_queue(self, user_id: str, namespace: str, workflow_id: str): - """Remove event queue for user-specific workflow""" - workflow_key = self.get_workflow_key(user_id, namespace, workflow_id) - - if workflow_key in self.event_queues: - del self.event_queues[workflow_key] - logger.info( - f'Cleaned up event queue for user {user_id}, workflow {namespace}/{workflow_id}' - ) + def cleanup_queue(self, execution_id: str): + if execution_id in self.event_queues: + del self.event_queues[execution_id] + logger.info(f'Cleaned up event queue for execution {execution_id}') # Global event streamer instance event_streamer = WorkflowEventStreamer() -# Hardcoded events filter - listen to all event types DEFAULT_EVENTS_FILTER: List[AriumEventType] = [ AriumEventType.WORKFLOW_STARTED, AriumEventType.WORKFLOW_COMPLETED, @@ -88,26 +49,21 @@ def cleanup_queue(self, user_id: str, namespace: str, workflow_id: str): def create_workflow_event_callback( - user_id: str, namespace: str, workflow_id: str + execution_id: str, + namespace: str, + workflow_id: str, ) -> Callable[[AriumEvent], None]: """ - Create a hardcoded event callback function for user-specific HTTP streaming - - Args: - user_id: User ID from authenticated session - namespace: Workflow namespace - workflow_id: Workflow ID - - Returns: - Event callback function that queues events for HTTP streaming + Create an event callback scoped to a single execution_id so concurrent + runs of the same workflow never share a queue. """ + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None def event_callback(event: AriumEvent) -> None: - """ - Hardcoded callback that converts AriumEvent to WorkflowEventMessage and queues for HTTP streaming - """ try: - # Convert AriumEvent to WorkflowEventMessage event_message = WorkflowEventMessage( event_type=event.event_type.value, timestamp=event.timestamp, @@ -118,19 +74,28 @@ def event_callback(event: AriumEvent) -> None: execution_time=event.execution_time, error=event.error, router_choice=event.router_choice, + node_output=event.node_output, metadata=event.metadata, ) - - # Queue event for HTTP streaming (async operation, we'll queue it) - asyncio.create_task( - event_streamer.add_event(user_id, namespace, workflow_id, event_message) - ) - - logger.debug( - f'Workflow event queued: {event.event_type.value} for user {user_id}, workflow {namespace}/{workflow_id}' - ) - + running_loop = loop + try: + running_loop = asyncio.get_running_loop() + except RuntimeError: + pass + + if running_loop is not None: + asyncio.ensure_future( + event_streamer.add_event(execution_id, event_message), + loop=running_loop, + ) + else: + logger.warning( + f'No event loop available to queue event {event.event_type.value} ' + f'for execution {execution_id}' + ) except Exception as e: - logger.error(f'Error in workflow event callback for user {user_id}: {e}') + logger.error( + f'Error in workflow event callback for execution {execution_id}: {e}' + ) return event_callback diff --git a/wavefront/server/modules/agents_module/agents_module/services/workflow_inference_service.py b/wavefront/server/modules/agents_module/agents_module/services/workflow_inference_service.py index 8762e415..3d218f11 100644 --- a/wavefront/server/modules/agents_module/agents_module/services/workflow_inference_service.py +++ b/wavefront/server/modules/agents_module/agents_module/services/workflow_inference_service.py @@ -1,3 +1,4 @@ +import asyncio import time from typing import Any, Dict, List, Optional, Callable import yaml @@ -56,7 +57,7 @@ async def fetch_workflow_yaml(self, workflow_name: str, namespace: str) -> str: cache_key = get_workflow_yaml_cache_key(namespace, workflow_name) # Try to get from cache first - cached_result = self.cache_manager.get_str(cache_key) + cached_result = await asyncio.to_thread(self.cache_manager.get_str, cache_key) if cached_result: logger.info( f'Cache hit fetching workflow YAML for namespace: {namespace}, workflow: {workflow_name}' @@ -66,12 +67,14 @@ async def fetch_workflow_yaml(self, workflow_name: str, namespace: str) -> str: logger.info( f'Fetching workflow YAML for namespace: {namespace}, workflow: {workflow_name}' ) - yaml_bytes: bytes = self.cloud_storage_manager.read_file( - self.bucket_name, yaml_key + yaml_bytes: bytes = await asyncio.to_thread( + self.cloud_storage_manager.read_file, self.bucket_name, yaml_key ) yaml_content = yaml_bytes.decode('utf-8') - self.cache_manager.add(cache_key, yaml_content, expiry=3600) + await asyncio.to_thread( + self.cache_manager.add, cache_key, yaml_content, expiry=3600 + ) logger.info( f'Successfully fetched workflow YAML for namespace: {namespace}, workflow: {workflow_name}' diff --git a/wavefront/server/modules/agents_module/agents_module/utils/input_processing_utils.py b/wavefront/server/modules/agents_module/agents_module/utils/input_processing_utils.py index 0862965d..99854344 100644 --- a/wavefront/server/modules/agents_module/agents_module/utils/input_processing_utils.py +++ b/wavefront/server/modules/agents_module/agents_module/utils/input_processing_utils.py @@ -75,12 +75,23 @@ def process_inference_inputs( detail=f'Invalid base64 image data: {e}', ) elif is_doc_message(input_content): - # DocumentMessage - append directly + # Inject filename as a text message before the document so + # agents can reference the original file name in their output. + file_name = input_content.get('file_name') + if file_name: + resolved_inputs.append( + UserMessage( + content=TextMessageContent( + text=f'The original filename of this document is: {file_name}' + ) + ) + ) resolved_inputs.append( UserMessage( content=DocumentMessageContent( base64=input_content.get('document_base64'), mime_type=input_content.get('mime_type'), + url=input_content.get('document_url'), ) ) ) diff --git a/wavefront/server/modules/auth_module/auth_module/auth_container.py b/wavefront/server/modules/auth_module/auth_module/auth_container.py index 1737a03f..d18c0d0b 100644 --- a/wavefront/server/modules/auth_module/auth_module/auth_container.py +++ b/wavefront/server/modules/auth_module/auth_module/auth_container.py @@ -37,8 +37,12 @@ class AuthContainer(containers.DeclarativeContainer): db_client=db_client, ) - kms_service = providers.Singleton( - FloKmsService, cloud_provider=config.cloud_config.cloud_provider + kms_service = providers.Selector( + config.jwt_token.enable_cloud_kms, + true=providers.Singleton( + FloKmsService, cloud_provider=config.cloud_config.cloud_provider + ), + false=providers.Object(None), ) token_service = providers.Singleton(