Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions flo_ai/flo_ai/arium/arium.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ async def _execute_node(
node_name=node.name,
node_type=node_type,
execution_time=execution_time,
node_output=self._serialize_node_output(result),
)

return result
Expand Down Expand Up @@ -561,6 +562,7 @@ async def _execute_node(
node_name=node.name,
node_type=node_type,
execution_time=execution_time,
node_output=self._serialize_node_output(result),
)

return result
Expand Down Expand Up @@ -638,3 +640,25 @@ def _add_to_memory(self, message: MessageMemoryItem):
Store message in memory
"""
self.memory.add(message)

def _serialize_node_output(self, result: Any) -> Optional[str]:
if result is None:
return None
if isinstance(result, str):
return result
if isinstance(result, list):
parts = [self._serialize_node_output(item) for item in result]
return '\n'.join(p for p in parts if p) or None
if hasattr(result, 'content'):
return self._serialize_node_output(result.content)
if hasattr(result, 'text'):
return result.text
# DocumentMessageContent / ImageMessageContent — show url or type label
media_type = getattr(result, 'type', None)
if media_type in ('document', 'image'):
url = getattr(result, 'url', None)
mime = getattr(result, 'mime_type', None)
if url:
return f'[{media_type}: {url}]'
return f'[{media_type}{f": {mime}" if mime else ""}]'
return str(result)
Comment on lines +644 to +664
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

_serialize_node_output may return non-string values, violating the declared return type.

The signature declares Optional[str], but several branches can return a non-str:

  • Line 657: return content.texttext is not guaranteed to be a str on arbitrary objects.
  • Line 660: return result.text — same concern.

If a downstream consumer (e.g., WorkflowEventMessage.node_output: Optional[str] in workflow_schemas.py) calls json.dumps(event_data) or Pydantic validation on it, a non-string attribute could raise at runtime or silently propagate unexpected types into event payloads.

🛡️ Proposed coercion + brief docstring
     def _serialize_node_output(self, result: Any) -> Optional[str]:
+        """Serialize a node result to a string for event telemetry, or None if empty."""
         if result is None:
             return None
         if isinstance(result, str):
             return result
         if isinstance(result, list):
             parts = [self._serialize_node_output(item) for item in result]
             return '\n'.join(p for p in parts if p) or None
         if hasattr(result, 'content'):
             content = result.content
             if isinstance(content, str):
                 return content
             if hasattr(content, 'text'):
-                return content.text
+                return str(content.text) if content.text is not None else None
             return str(content)
         if hasattr(result, 'text'):
-            return result.text
+            return str(result.text) if result.text is not None else None
         return str(result)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flo_ai/flo_ai/arium/arium.py` around lines 644 - 661, The
_serialize_node_output function can return non-str types from attributes like
content.text and result.text, violating the declared Optional[str] return;
update _serialize_node_output to coerce any non-None, non-str branch to a string
(e.g., wrap content.text and result.text with str(...) and ensure list
comprehension results are coerced to strings before joining) while preserving
None propagation, and add a short docstring noting the coercion; target the
function named _serialize_node_output and the branches handling list,
hasattr(result, 'content') (content.text) and hasattr(result, 'text')
(result.text).

1 change: 1 addition & 0 deletions flo_ai/flo_ai/arium/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class AriumEvent:
execution_time: Optional[float] = None
error: Optional[str] = None
router_choice: Optional[str] = None
node_output: Optional[str] = None
metadata: Optional[dict] = None


Expand Down
1 change: 1 addition & 0 deletions wavefront/client/src/components/InferencePopup.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ const InferencePopup: React.FC<InferencePopupProps> = ({ onClose, renderModal =
document_type: doc.documentType,
document_base64: doc.base64Content,
mime_type: doc.mimeType,
file_name: doc.file.name,
metadata: {
filename: doc.file.name,
size: doc.file.size,
Expand Down
7 changes: 7 additions & 0 deletions wavefront/client/src/components/Stream.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ interface StreamProps {
execution_time?: number;
error?: string;
router_choice?: string;
node_output?: string;
}>;
isStreaming?: boolean;
eventsContainerRef?: RefObject<HTMLDivElement | null>;
Expand Down Expand Up @@ -74,6 +75,12 @@ const Stream: React.FC<StreamProps> = ({ listenEventsEnabled, streamingEvents, i
{'router_choice' in event && event.router_choice && (
<div className="mt-1 text-blue-600">Router choice: {event.router_choice}</div>
)}
{'node_output' in event && event.node_output && (
<div className="mt-1 rounded border border-gray-200 bg-white p-2">
<span className="font-medium text-gray-500">Output: </span>
<span className="break-words whitespace-pre-wrap text-gray-700">{event.node_output}</span>
</div>
)}
</div>
))}
</div>
Expand Down
1 change: 1 addition & 0 deletions wavefront/client/src/pages/apps/[appId]/agents/[id].tsx
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ const AgentDetail: React.FC = () => {
document_type: doc.documentType,
document_base64: doc.base64Content,
mime_type: doc.mimeType,
file_name: doc.file.name,
metadata: {
filename: doc.file.name,
size: doc.file.size,
Expand Down
1 change: 1 addition & 0 deletions wavefront/client/src/pages/apps/[appId]/workflows/[id].tsx
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ const WorkflowDetail: React.FC = () => {
document_type: doc.documentType,
document_base64: doc.base64Content,
mime_type: doc.mimeType,
file_name: doc.file.name,
metadata: {
filename: doc.file.name,
size: doc.file.size,
Expand Down
1 change: 1 addition & 0 deletions wavefront/client/src/types/chat-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export interface DocumentContent {
document_type: string;
document_base64?: string;
mime_type?: string;
file_name?: string;
metadata?: {
filename?: string;
size?: number;
Expand Down
2 changes: 2 additions & 0 deletions wavefront/client/src/types/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ export interface WorkflowEventBase {
execution_time?: number;
error?: string;
router_choice?: string;
node_output?: string;
metadata?: Record<string, unknown>;
}

Expand All @@ -105,6 +106,7 @@ export interface NodeCompletedEvent extends WorkflowEventBase {
event_type: 'node_completed';
node_name: string;
execution_time: number;
node_output?: string;
}

export interface NodeFailedEvent extends WorkflowEventBase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,12 @@ class ApplicationContainer(containers.DeclarativeContainer):
app_user_repository=app_user_repository,
)

kms_service = providers.Singleton(
FloKmsService, cloud_provider=config.cloud_config.cloud_provider
kms_service = providers.Selector(
config.jwt_token.enable_cloud_kms,
true=providers.Singleton(
FloKmsService, cloud_provider=config.cloud_config.cloud_provider
),
false=providers.Object(None),
Comment on lines +69 to +74
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

python - <<'PY'
try:
    from dependency_injector import providers
except ModuleNotFoundError:
    print("dependency_injector is not installed in this sandbox; run this in the project environment.")
    raise SystemExit(0)

for selector_value in (True, False, "true", "false", "True", "False", "1", "0"):
    selector = providers.Selector(
        providers.Object(selector_value),
        true=providers.Object("kms"),
        false=providers.Object(None),
    )
    try:
        print(f"{selector_value!r} -> {selector()!r}")
    except Exception as exc:
        print(f"{selector_value!r} -> ERROR: {type(exc).__name__}: {exc}")
PY

rg -n -C2 'enable_cloud_kms|kms_service = providers\.Selector' .

Repository: rootflo/wavefront

Length of output: 2402


🏁 Script executed:

#!/bin/bash

# Find config loading logic
echo "=== Searching for config loading and enable_cloud_kms handling ==="
rg -n 'enable_cloud_kms|jwt_token|pydantic' --type py wavefront/server/apps/floconsole/floconsole/ | head -30

# Check the config schema/model definition
echo -e "\n=== Looking for config model definitions ==="
fd -e py 'config' wavefront/server/apps/floconsole/ | xargs grep -l 'enable_cloud_kms' 2>/dev/null | head -5

Repository: rootflo/wavefront

Length of output: 1897


🏁 Script executed:

#!/bin/bash

# Find how config is parsed - look for pydantic models or config classes
echo "=== Searching for config model/class definitions ==="
rg -A5 'class.*Config|enable_cloud_kms' wavefront/server/apps/floconsole/floconsole/ -t py | head -50

# Check if jwt_token is a pydantic model
echo -e "\n=== Finding jwt_token class/model ==="
rg -B2 -A10 'class.*JwtToken|jwt_token.*=' wavefront/server/apps/floconsole/ -t py | head -60

Repository: rootflo/wavefront

Length of output: 1265


🏁 Script executed:

#!/bin/bash

# Look for tests that might show actual behavior with Selector
echo "=== Searching for tests of application_container ==="
fd -e py test wavefront/server/apps/floconsole/ 2>/dev/null | xargs grep -l 'Selector\|kms_service' 2>/dev/null

# Check auth_container for comparison
echo -e "\n=== Checking auth_container context ==="
cat -n wavefront/server/modules/auth_module/auth_module/auth_container.py | sed -n '35,50p'

# Check if there are any error logs or GitHub issues mentioning this
echo -e "\n=== Checking for any FloKmsService error handling ==="
rg -B3 -A3 'FloKmsService|kms_service' wavefront/server/apps/floconsole/floconsole/services/ -t py

Repository: rootflo/wavefront

Length of output: 5656


🏁 Script executed:

#!/bin/bash

# Look for the config initialization in application_container
echo "=== Full application_container.py to see config setup ==="
cat -n wavefront/server/apps/floconsole/floconsole/di/application_container.py | head -80

# Search for config class/model definition
echo -e "\n=== Searching for config model in floconsole ==="
fd -e py wavefront/server/apps/floconsole/floconsole/ | xargs grep -l 'class.*Config\|JwtToken' 2>/dev/null

# Check imports in application_container
echo -e "\n=== Checking imports in application_container ==="
head -30 wavefront/server/apps/floconsole/floconsole/di/application_container.py

Repository: rootflo/wavefront

Length of output: 4758


🏁 Script executed:

#!/bin/bash

# Look for config models more broadly
echo "=== Searching for JwtToken/Config class definitions ==="
rg 'class JwtToken|class.*Config.*:' wavefront/server/ -t py -B1 -A10 | head -80

# Check how config is instantiated
echo -e "\n=== Searching for config = ... or load_config ==="
rg 'config.*=|load_config|from_env|Config\(' wavefront/server/apps/floconsole/floconsole/di/ -t py -B2 -A2

Repository: rootflo/wavefront

Length of output: 10148


🏁 Script executed:

#!/bin/bash

# Look for the actual config definition module
echo "=== Checking for config module in floconsole ==="
fd -e py -path '*/floconsole/*' 'config' 2>/dev/null | head -20

# Try searching for config class with enable_cloud_kms field
echo -e "\n=== Searching for enable_cloud_kms field definition ==="
rg -B5 -A5 'enable_cloud_kms.*:' wavefront/server/ -t py

# Check the actual yaml/ini/toml config files
echo -e "\n=== Checking config files ==="
find wavefront/server/apps/floconsole -name '*.ini' -o -name '*.yaml' -o -name '*.yml' -o -name '*.toml' | xargs cat -n 2>/dev/null | grep -C3 'enable_cloud_kms'

Repository: rootflo/wavefront

Length of output: 541


Normalize the selector value to ensure it matches "true" or "false" keys.

Line 70: config.jwt_token.enable_cloud_kms is a string value from the INI configuration (parsed from the ${ENABLE_CLOUD_KMS} environment variable). The providers.Selector performs exact key matching on strings. If the environment variable is set to "True", "1", "yes", or any non-standard format, the selector will fail to match either the true or false branch, causing runtime initialization failure.

Apply the same normalization used in the auth container:

Proposed normalization
     kms_service = providers.Selector(
-        config.jwt_token.enable_cloud_kms,
+        providers.Callable(
+            lambda enabled: (
+                'true'
+                if str(enabled).strip().lower() in {'1', 'true', 'yes', 'on'}
+                else 'false'
+            ),
+            config.jwt_token.enable_cloud_kms,
+        ),
         true=providers.Singleton(
             FloKmsService, cloud_provider=config.cloud_config.cloud_provider
         ),
         false=providers.Object(None),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
kms_service = providers.Selector(
config.jwt_token.enable_cloud_kms,
true=providers.Singleton(
FloKmsService, cloud_provider=config.cloud_config.cloud_provider
),
false=providers.Object(None),
kms_service = providers.Selector(
providers.Callable(
lambda enabled: (
'true'
if str(enabled).strip().lower() in {'1', 'true', 'yes', 'on'}
else 'false'
),
config.jwt_token.enable_cloud_kms,
),
true=providers.Singleton(
FloKmsService, cloud_provider=config.cloud_config.cloud_provider
),
false=providers.Object(None),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/apps/floconsole/floconsole/di/application_container.py`
around lines 69 - 74, The selector for kms_service uses
config.jwt_token.enable_cloud_kms (a string) and may not match "true"/"false"
exactly; normalize that value the same way used in the auth container before
passing it to providers.Selector — e.g., convert
config.jwt_token.enable_cloud_kms to a canonical "true" or "false" (handle
"True","1","yes","on" -> "true" and others -> "false") and then call
providers.Selector with the normalized value so the true branch
(providers.Singleton(FloKmsService, ...)) and false branch
(providers.Object(None)) reliably resolve.

)

token_service = providers.Singleton(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from dependency_injector.wiring import inject, Provide
import json
import asyncio
import uuid
import time

from common_module.log.logger import logger
from common_module.response_formatter import ResponseFormatter
Expand Down Expand Up @@ -75,9 +77,6 @@ async def workflow_inference(
f'Starting inference for namespace: {namespace}, workflow_id: {workflow_id}, listen_events: {listen_events}'
)

# Extract user_id from authenticated session
user_id = request.state.session.user_id

# Extract authentication credentials
access_token, app_key = extract_auth_credentials(request)

Expand All @@ -89,27 +88,26 @@ async def workflow_inference(
events_filter = None

if listen_events or request_body.listen_events:
event_callback = create_workflow_event_callback(user_id, namespace, workflow_id)
execution_id = str(uuid.uuid4())
event_callback = create_workflow_event_callback(
execution_id, namespace, workflow_id
)
events_filter = DEFAULT_EVENTS_FILTER
logger.info(
f'Event streaming enabled for user {user_id}, workflow {namespace}/{workflow_id}'
f'Event streaming enabled for execution {execution_id}, workflow {namespace}/{workflow_id}'
)

# Check if streaming is requested
if listen_events or request_body.listen_events:
logger.info(
f'Streaming inference for user {user_id}, workflow {namespace}/{workflow_id}'
f'Streaming inference for execution {execution_id}, workflow {namespace}/{workflow_id}'
)

# Get or create event queue for this user-workflow
event_queue = event_streamer.get_or_create_queue(
user_id, namespace, workflow_id
)
event_queue = event_streamer.create_queue(execution_id)

async def generate_inference_stream():
"""Generate streaming inference with events and final output"""
try:
# Start inference in background task
inference_task = asyncio.create_task(
workflow_inference_service.perform_inference(
workflow_name=workflow_id,
Expand All @@ -126,60 +124,54 @@ async def generate_inference_stream():
)
)

# Stream events while workflow is running
workflow_completed = False
while not workflow_completed and not inference_task.done():
# Stream events until inference completes
while not inference_task.done():
try:
# Wait for event with timeout
event_data = await asyncio.wait_for(
event_queue.get(), timeout=1.0
)
yield f'data: {json.dumps(event_data)}\n\n'
await asyncio.sleep(0.1) # remove it later

# Check if workflow ended
if event_data.get('event_type') in [
'workflow_completed',
'workflow_failed',
]:
workflow_completed = True

except asyncio.TimeoutError:
# Continue waiting if no events
continue

# Wait for inference to complete and get result
# Yield to the event loop so any ensure_future(add_event(...))
# callbacks scheduled inside the inference task have a chance
# to run and enqueue their events before we drain.
await asyncio.sleep(0)

# Drain any remaining events queued after task completion
while not event_queue.empty():
event_data = event_queue.get_nowait()
yield f'data: {json.dumps(event_data)}\n\n'

result, execution_time = await inference_task
Comment thread
coderabbitai[bot] marked this conversation as resolved.

# Send final output event
output_event = {
'event_type': 'output',
'result': result,
'workflow_id': workflow_id,
'namespace': namespace,
'execution_time': execution_time,
'timestamp': asyncio.get_event_loop().time(),
'timestamp': time.time(),
}
yield f'data: {json.dumps(output_event)}\n\n'
await asyncio.sleep(0.1) # remove it later

logger.info(
f'Streaming inference completed for user {user_id}, workflow {namespace}/{workflow_id}'
f'Streaming inference completed for execution {execution_id}, workflow {namespace}/{workflow_id}'
)

except Exception as e:
logger.error(
f'Error in streaming inference for user {user_id}, workflow {namespace}/{workflow_id}: {e}'
f'Error in streaming inference for execution {execution_id}, workflow {namespace}/{workflow_id}: {e}'
)
error_event = {
'event_type': 'error',
'error': str(e),
'timestamp': asyncio.get_event_loop().time(),
'timestamp': time.time(),
}
yield f'data: {json.dumps(error_event)}\n\n'
finally:
# Clean up queue
event_streamer.cleanup_queue(user_id, namespace, workflow_id)
event_streamer.cleanup_queue(execution_id)

return StreamingResponse(
generate_inference_stream(),
Expand Down Expand Up @@ -278,10 +270,6 @@ async def workflow_inference_v2(
logger.info(
f'Starting v2 inference for workflow_id: {workflow_id}, listen_events: {listen_events}'
)

# Extract user_id from authenticated session
user_id = request.state.session.user_id

# Extract authentication credentials
access_token, app_key = extract_auth_credentials(request)

Expand All @@ -298,30 +286,26 @@ async def workflow_inference_v2(
events_filter = None

if listen_events or request_body.listen_events:
# Use real namespace and workflow name for event streaming
execution_id = str(uuid.uuid4())
event_callback = create_workflow_event_callback(
user_id, namespace, workflow_name
execution_id, namespace, workflow_name
)
events_filter = DEFAULT_EVENTS_FILTER
logger.info(
f'Event streaming enabled for user {user_id}, workflow {namespace}/{workflow_name}'
f'Event streaming enabled for execution {execution_id}, workflow {namespace}/{workflow_name}'
)

# Check if streaming is requested
if listen_events or request_body.listen_events:
logger.info(
f'Streaming inference for user {user_id}, workflow {namespace}/{workflow_name}'
f'Streaming inference for execution {execution_id}, workflow {namespace}/{workflow_name}'
)

# Get or create event queue for this user-workflow
event_queue = event_streamer.get_or_create_queue(
user_id, namespace, workflow_name
)
event_queue = event_streamer.create_queue(execution_id)

async def generate_inference_stream():
"""Generate streaming inference with events and final output"""
try:
# Start inference in background task
inference_task = asyncio.create_task(
workflow_inference_service.perform_inference_v2(
workflow_data=workflow_data,
Expand All @@ -337,66 +321,54 @@ async def generate_inference_stream():
)
)

# Stream events while workflow is running
workflow_completed = False
while not workflow_completed and not inference_task.done():
# Stream events until inference completes
while not inference_task.done():
try:
# Wait for event with timeout
event_data = await asyncio.wait_for(
event_queue.get(), timeout=1.0
)
yield f'data: {json.dumps(event_data)}\n\n'
await asyncio.sleep(0.1) # remove it later

# Check if workflow ended
if event_data.get('event_type') in [
'workflow_completed',
'workflow_failed',
]:
workflow_completed = True

except asyncio.TimeoutError:
# Continue waiting if no events
continue

# Wait for inference to complete and get result
# Yield to the event loop so any ensure_future(add_event(...))
# callbacks scheduled inside the inference task have a chance
# to run and enqueue their events before we drain.
await asyncio.sleep(0)

# Drain any remaining events queued after task completion
while not event_queue.empty():
event_data = event_queue.get_nowait()
yield f'data: {json.dumps(event_data)}\n\n'

result, execution_time = await inference_task

# Send final output event
output_event = {
'event_type': 'output',
'result': result,
'workflow_id': workflow_name,
'namespace': namespace,
'execution_time': execution_time,
'timestamp': asyncio.get_event_loop().time(),
'timestamp': time.time(),
}
yield f'data: {json.dumps(output_event)}\n\n'
await asyncio.sleep(0.1) # remove it later

logger.info(
f'Streaming inference completed for user {user_id}, workflow {namespace}/{workflow_name}'
f'Streaming inference completed for execution {execution_id}, workflow {namespace}/{workflow_name}'
)

except ValueError as e:
logger.error(f'Error in streaming inference: {e}')
error_event = {
'event_type': 'error',
'error': str(e),
'timestamp': asyncio.get_event_loop().time(),
}
yield f'data: {json.dumps(error_event)}\n\n'
except Exception as e:
logger.error(f'Error in streaming inference: {e}')
logger.error(
f'Error in streaming inference for execution {execution_id}: {e}'
)
error_event = {
'event_type': 'error',
'error': str(e),
'timestamp': asyncio.get_event_loop().time(),
'timestamp': time.time(),
}
yield f'data: {json.dumps(error_event)}\n\n'
finally:
# Clean up queue
event_streamer.cleanup_queue(user_id, namespace, workflow_name)
event_streamer.cleanup_queue(execution_id)

return StreamingResponse(
generate_inference_stream(),
Expand Down
Loading
Loading