Skip to content

feature for agent outputs in events & streaming fixes#280

Merged
vizsatiz merged 6 commits into
developfrom
feature/output_in_events
Apr 23, 2026
Merged

feature for agent outputs in events & streaming fixes#280
vizsatiz merged 6 commits into
developfrom
feature/output_in_events

Conversation

@vizsatiz
Copy link
Copy Markdown
Member

@vizsatiz vizsatiz commented Apr 23, 2026

Summary by CodeRabbit

  • New Features

    • Workflow events now include node output, showing what each completed step returns
    • Document filenames are preserved and included in inference requests across the system
  • Chores

    • Cloud KMS service configuration made conditional based on feature enablement

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 23, 2026

Caution

Review failed

Pull request was closed or merged during review

📝 Walkthrough

Walkthrough

The PR adds a node_output field across workflow event layers, serializing node execution results from backend through to frontend UI. Event queue isolation is refactored to use execution-scoped IDs instead of user/workflow combinations. Documents now include file_name fields, blocking I/O operations are offloaded to background threads, and cloud KMS service provisioning is made conditional.

Changes

Cohort / File(s) Summary
Node Output Tracking
flo_ai/flo_ai/arium/arium.py, flo_ai/flo_ai/arium/events.py, wavefront/client/src/types/workflow.ts, wavefront/client/src/types/chat-message.ts, wavefront/client/src/components/Stream.tsx, wavefront/server/modules/agents_module/.../models/workflow_schemas.py
Adds optional node_output field to event models and serializes node execution results. Backend serializes output via _serialize_node_output() method handling various content types (None, strings, lists, objects with .content, document-like structures). Frontend types extend to carry node_output and Stream component renders output as formatted "Output:" section.
Event Queue Refactoring & Execution Isolation
wavefront/server/modules/agents_module/.../controllers/workflow_controller.py, wavefront/server/modules/agents_module/.../services/workflow_events.py
Refactors event queue isolation from user/workflow-based keys to execution-scoped IDs. Introduces explicit queue lifecycle APIs (create_queue, cleanup_queue). Event streaming now uses per-request execution_id instead of user_id. Updates SSE generator to drain remaining events after task completion and removes prior termination detection. Changes event timestamps from asyncio.get_event_loop().time() to time.time().
Cloud KMS Conditional Configuration
wavefront/server/apps/floconsole/.../application_container.py, wavefront/server/modules/auth_module/.../auth_container.py
Makes kms_service optional and conditionally provided based on config.jwt_token.enable_cloud_kms. When enabled, provides singleton FloKmsService; when disabled, provides None. Token service remains wired to receive kms_service with new null-aware behavior.
Document Metadata Enhancement
wavefront/client/src/components/InferencePopup.tsx, wavefront/client/src/pages/apps/[appId]/agents/[id].tsx, wavefront/client/src/pages/apps/[appId]/workflows/[id].tsx, wavefront/server/modules/agents_module/.../input_processing_utils.py
Adds file_name field (from uploaded document's File.name) to document payloads. Backend conditionally prepends user text message with filename and includes url field in DocumentMessageContent payload alongside existing base64 and mime_type fields.
Async I/O Optimization
wavefront/server/modules/agents_module/.../workflow_inference_service.py
Offloads potentially blocking cache and cloud storage operations (cache_manager.get_str, cache_manager.add, cloud_storage_manager.read_file) onto background threads via await asyncio.to_thread(...), preserving existing cache-hit/return flow while avoiding async blockage.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~40 minutes

Possibly related PRs

Poem

🐰 A rabbit hops through events new,
With node outputs now shining through!
Queue isolation spins on IDs so bright,
File names attached—documents in flight!
Async threads dance, no blockage in sight.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 60.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title captures two main aspects of the changeset: adding node outputs to events and fixing streaming infrastructure, though the second part is somewhat vague.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/output_in_events

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

running_loop = loop
try:
running_loop = asyncio.get_running_loop()
except RuntimeError:
@vizsatiz vizsatiz changed the title feature output in events. feature for agent outputs in events & streaming fixes Apr 23, 2026
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (8)
wavefront/server/modules/agents_module/agents_module/controllers/workflow_controller.py (1)

90-105: Queue creation races callback invocation.

create_workflow_event_callback(...) is built on line 91 and may capture events before event_streamer.create_queue(execution_id) is called on line 105. In the current control flow this is safe because the inference task is not started until after create_queue, but this ordering is fragile: if a future refactor moves callback wiring earlier or fires a synchronous pre-event, add_event will silently drop events because the queue isn't registered yet.

Creating the queue right after generating execution_id (before constructing the callback) would remove this foot-gun. Same applies to the v2 path at lines 283–298.

♻️ Proposed reordering
     if listen_events or request_body.listen_events:
         execution_id = str(uuid.uuid4())
+        event_queue = event_streamer.create_queue(execution_id)
         event_callback = create_workflow_event_callback(
             execution_id, namespace, workflow_id
         )
         events_filter = DEFAULT_EVENTS_FILTER
         ...

     # Check if streaming is requested
     if listen_events or request_body.listen_events:
         logger.info(
             f'Streaming inference for execution {execution_id}, workflow {namespace}/{workflow_id}'
         )
-
-        event_queue = event_streamer.create_queue(execution_id)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/agents_module/agents_module/controllers/workflow_controller.py`
around lines 90 - 105, The callback wiring can capture events before its queue
is registered; after generating execution_id (the execution_id variable), call
event_streamer.create_queue(execution_id) immediately before creating the
callback (create_workflow_event_callback) so the queue is guaranteed to exist
when the callback calls add_event; apply the same reorder in the v2 path that
uses create_workflow_event_callback and event_streamer.create_queue (the block
around the v2 handler lines referencing execution_id and
create_workflow_event_callback) to ensure queues are created before callbacks
are constructed.
wavefront/server/modules/agents_module/agents_module/services/workflow_events.py (2)

14-18: Silent overwrite on duplicate execution_id.

create_queue unconditionally assigns self.event_queues[execution_id] = queue. If (due to a caller bug or future refactor) the same execution_id is used twice, the first queue — potentially still draining events — is silently orphaned. A guard would surface the mis-use loudly.

♻️ Proposed guard
     def create_queue(self, execution_id: str) -> asyncio.Queue:
+        if execution_id in self.event_queues:
+            logger.warning(
+                f'Event queue for execution {execution_id} already exists; overwriting'
+            )
         queue: asyncio.Queue = asyncio.Queue()
         self.event_queues[execution_id] = queue
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/agents_module/agents_module/services/workflow_events.py`
around lines 14 - 18, create_queue currently unconditionally overwrites
self.event_queues[execution_id], silently orphaning an existing queue; update
the create_queue method to check if execution_id already exists in
self.event_queues and, if so, log an error (using logger.error) and raise an
explicit exception (e.g., ValueError or RuntimeError) instead of creating a new
queue, otherwise proceed to create and assign the queue as before; reference the
create_queue function and the self.event_queues dict when making this change.

60-64: Captured loop at callback-creation time is effectively dead code.

loop is captured on lines 60-63, then at lines 80-84 the callback re-queries asyncio.get_running_loop() at invocation time and overwrites running_loop. If the re-query fails, we fall back to the captured loop — but if the callback fires outside any running loop (the only case where the capture would matter), loop.call_soon_threadsafe(...) or similar is what you'd actually need; asyncio.ensure_future(..., loop=loop) on a non-running loop will not execute.

Either drop the initial capture (simpler), or switch to loop.call_soon_threadsafe(asyncio.ensure_future, ...) when the callback executes on a non-loop thread.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/agents_module/agents_module/services/workflow_events.py`
around lines 60 - 64, The captured variable loop (set via
asyncio.get_running_loop() at callback-creation) is dead code because the
callback re-queries asyncio.get_running_loop() at invocation and overwrites
running_loop; fix by removing the pre-capture and always attempt
asyncio.get_running_loop() at invocation, or if you want to support the callback
firing off-thread keep the captured loop and change the invocation path to use
loop.call_soon_threadsafe(asyncio.ensure_future, coro) when
asyncio.get_running_loop() raises; update the code paths around the captured
loop, running_loop, asyncio.get_running_loop() and asyncio.ensure_future to
follow one of these two consistent strategies.
wavefront/client/src/types/workflow.ts (1)

105-110: Redundant node_output re-declaration on NodeCompletedEvent.

WorkflowEventBase already declares node_output?: string (line 82), so re-declaring it on NodeCompletedEvent (line 109) is redundant. Keeping it only on the base (or only narrowing it here if you intend to make it required for completed events) would be cleaner.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/client/src/types/workflow.ts` around lines 105 - 110, The
NodeCompletedEvent interface currently re-declares node_output which is already
defined on WorkflowEventBase; remove the redundant node_output?: string from
NodeCompletedEvent (or, if completed events must always include output, change
it to node_output: string to narrow the type) so the declaration is only on
WorkflowEventBase unless intentional narrowing is required; update
NodeCompletedEvent accordingly and ensure TypeScript types still compile for
usages of NodeCompletedEvent and WorkflowEventBase.
flo_ai/flo_ai/arium/events.py (1)

30-52: Docstring missing node_output description.

The AriumEvent class docstring (lines 33-41) documents all fields but doesn't mention the new node_output attribute. Consider adding a line so the schema is self-documenting.

📝 Proposed docstring update
         router_choice: The node chosen by a router decision
+        node_output: Serialized output produced by the node upon completion
         metadata: Additional event-specific data
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flo_ai/flo_ai/arium/events.py` around lines 30 - 52, The AriumEvent docstring
is missing documentation for the new field node_output; update the class
docstring for AriumEvent to add a short description line for node_output (e.g.,
"node_output: Output produced by the node, if any") so the schema is
self-documenting and matches the declared attributes; ensure the new line is
placed with the other attribute descriptions and uses the same style as existing
entries.
flo_ai/flo_ai/arium/arium.py (1)

502-511: Consider bounding the serialized output size in events.

Large node outputs (e.g., long LLM responses, multi-MB tool outputs) are now serialized and shipped through every event pipeline (SSE payloads, WebSocket, logs). This can significantly increase per-event memory/bandwidth and may stress SSE clients. Consider truncating node_output at a reasonable size (with an ellipsis marker) either inside _serialize_node_output or at event-emission time, with the full result still available in memory for the actual workflow return.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flo_ai/flo_ai/arium/arium.py` around lines 502 - 511, The NODE_COMPLETED
event is including full serialized node outputs which can be very large;
truncate node_output before emitting to avoid huge SSE/WebSocket payloads by
bounding the serialized size (e.g., max_chars or max_bytes) and appending an
ellipsis marker when truncated. Implement this either inside
_serialize_node_output (add an optional max_size param or internal truncation)
or immediately before calling _emit_event in the NODE_COMPLETED emission path
(truncate the string returned by _serialize_node_output), and ensure the full
result remains available in memory for workflow return but only the truncated
string is sent in the event (reference symbols: _serialize_node_output,
_emit_event, AriumEventType.NODE_COMPLETED, node_output).
wavefront/server/modules/auth_module/auth_module/auth_container.py (2)

45-52: Make the nullable KMS contract explicit in auth TokenService.

Line 45 now intentionally injects None, but auth_module.services.token_service.TokenService.__init__ still declares kms_service: FloKMS. Mirror the floconsole service’s FloKMS | None contract so this DI state is explicit rather than relying on the current is_dev side effect.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/modules/auth_module/auth_module/auth_container.py` around
lines 45 - 52, The DI currently injects None for kms_service in auth_container
but TokenService.__init__ still types kms_service as FloKMS; update
TokenService.__init__ signature to accept FloKMS | None (or Optional[FloKMS])
and adjust any internal usage to handle a None kms_service safely, mirroring the
floconsole contract; ensure the provider in auth_container (the
providers.Singleton(TokenService, kms_service=...)) remains compatible with the
new nullable type.

40-45: Make selector key matching more robust to environment variable formats.

The providers.Selector at line 40-46 matches keyword argument names (true, false) against the resolved value of config.jwt_token.enable_cloud_kms. Per documentation, this env var should be set to the string "true" or "false", which will work correctly. However, if the environment provides alternate boolean-like values (e.g., "1", "0", "yes", "no", capitalized variants), the selector will fail to find a matching branch. Consider normalizing the config value to ensure reliable resolution:

Suggested normalization
     kms_service = providers.Selector(
-        config.jwt_token.enable_cloud_kms,
+        providers.Callable(
+            lambda enabled: (
+                'true'
+                if str(enabled).strip().lower() in {'1', 'true', 'yes', 'on'}
+                else 'false'
+            ),
+            config.jwt_token.enable_cloud_kms,
+        ),
         true=providers.Singleton(
             FloKmsService, cloud_provider=config.cloud_config.cloud_provider
         ),
         false=providers.Object(None),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/modules/auth_module/auth_module/auth_container.py` around
lines 40 - 45, The Selector kms_service uses raw
config.jwt_token.enable_cloud_kms keys like true/false which can fail for values
like "1","yes","True"; normalize the config value to a canonical boolean-string
before passing to providers.Selector (e.g., map "1","true","yes" -> "true" and
"0","false","no" -> "false"), then feed that normalized value into
providers.Selector so the existing branches (true ->
providers.Singleton(FloKmsService, ...), false -> providers.Object(None))
reliably resolve; update the code that constructs kms_service to use the
normalized value and keep the same branch identifiers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@flo_ai/flo_ai/arium/arium.py`:
- Around line 644-661: The _serialize_node_output function can return non-str
types from attributes like content.text and result.text, violating the declared
Optional[str] return; update _serialize_node_output to coerce any non-None,
non-str branch to a string (e.g., wrap content.text and result.text with
str(...) and ensure list comprehension results are coerced to strings before
joining) while preserving None propagation, and add a short docstring noting the
coercion; target the function named _serialize_node_output and the branches
handling list, hasattr(result, 'content') (content.text) and hasattr(result,
'text') (result.text).

In `@wavefront/server/apps/floconsole/floconsole/di/application_container.py`:
- Around line 69-74: The selector for kms_service uses
config.jwt_token.enable_cloud_kms (a string) and may not match "true"/"false"
exactly; normalize that value the same way used in the auth container before
passing it to providers.Selector — e.g., convert
config.jwt_token.enable_cloud_kms to a canonical "true" or "false" (handle
"True","1","yes","on" -> "true" and others -> "false") and then call
providers.Selector with the normalized value so the true branch
(providers.Singleton(FloKmsService, ...)) and false branch
(providers.Object(None)) reliably resolve.

In
`@wavefront/server/modules/agents_module/agents_module/controllers/workflow_controller.py`:
- Around line 127-141: The final drain loop can race with event callbacks
scheduled via asyncio.ensure_future(add_event(...)) in workflow_events.py
causing late events to be dropped; update the stream handlers around
inference_task (the block checking inference_task.done() and draining
event_queue) to wait briefly for scheduled callbacks to run before final
draining—either collect and await pending callback futures or insert a short
await asyncio.sleep(0) (or bounded loop like several short sleeps) prior to the
while not event_queue.empty() drain, and ensure cleanup_queue is called after
this bounded wait so add_event coroutines have a chance to enqueue their events.
- Around line 149-166: The timestamps for streaming events are using
asyncio.get_running_loop().time() (monotonic loop time) which breaks client
rendering; add import time at the top of the module and replace all uses of
asyncio.get_running_loop().time() in the streaming paths (where output_event and
error_event are created inside the streaming inference logic in
workflow_controller.py) with time.time() so the events use Unix epoch seconds
consistent with AriumEvent.timestamp and WorkflowEventMessage.timestamp; update
each occurrence that constructs output_event and error_event to call time.time()
instead.

In
`@wavefront/server/modules/agents_module/agents_module/services/workflow_events.py`:
- Around line 86-95: The call to asyncio.ensure_future in
create_workflow_event_callback discards the Task and can be garbage-collected;
fix by keeping a strong reference: add a module- or instance-level set (e.g.,
pending_event_tasks) to store Tasks created for event_streamer.add_event, assign
the result of asyncio.ensure_future(...) to a variable, add a done callback that
removes the Task from pending_event_tasks when complete, and insert the Task
into pending_event_tasks only when running_loop is not None (use the same
ensure_future call with loop=running_loop). Ensure references to running_loop,
event_streamer.add_event, and create_workflow_event_callback are updated to use
this tracking set so tasks are not dropped.

---

Nitpick comments:
In `@flo_ai/flo_ai/arium/arium.py`:
- Around line 502-511: The NODE_COMPLETED event is including full serialized
node outputs which can be very large; truncate node_output before emitting to
avoid huge SSE/WebSocket payloads by bounding the serialized size (e.g.,
max_chars or max_bytes) and appending an ellipsis marker when truncated.
Implement this either inside _serialize_node_output (add an optional max_size
param or internal truncation) or immediately before calling _emit_event in the
NODE_COMPLETED emission path (truncate the string returned by
_serialize_node_output), and ensure the full result remains available in memory
for workflow return but only the truncated string is sent in the event
(reference symbols: _serialize_node_output, _emit_event,
AriumEventType.NODE_COMPLETED, node_output).

In `@flo_ai/flo_ai/arium/events.py`:
- Around line 30-52: The AriumEvent docstring is missing documentation for the
new field node_output; update the class docstring for AriumEvent to add a short
description line for node_output (e.g., "node_output: Output produced by the
node, if any") so the schema is self-documenting and matches the declared
attributes; ensure the new line is placed with the other attribute descriptions
and uses the same style as existing entries.

In `@wavefront/client/src/types/workflow.ts`:
- Around line 105-110: The NodeCompletedEvent interface currently re-declares
node_output which is already defined on WorkflowEventBase; remove the redundant
node_output?: string from NodeCompletedEvent (or, if completed events must
always include output, change it to node_output: string to narrow the type) so
the declaration is only on WorkflowEventBase unless intentional narrowing is
required; update NodeCompletedEvent accordingly and ensure TypeScript types
still compile for usages of NodeCompletedEvent and WorkflowEventBase.

In
`@wavefront/server/modules/agents_module/agents_module/controllers/workflow_controller.py`:
- Around line 90-105: The callback wiring can capture events before its queue is
registered; after generating execution_id (the execution_id variable), call
event_streamer.create_queue(execution_id) immediately before creating the
callback (create_workflow_event_callback) so the queue is guaranteed to exist
when the callback calls add_event; apply the same reorder in the v2 path that
uses create_workflow_event_callback and event_streamer.create_queue (the block
around the v2 handler lines referencing execution_id and
create_workflow_event_callback) to ensure queues are created before callbacks
are constructed.

In
`@wavefront/server/modules/agents_module/agents_module/services/workflow_events.py`:
- Around line 14-18: create_queue currently unconditionally overwrites
self.event_queues[execution_id], silently orphaning an existing queue; update
the create_queue method to check if execution_id already exists in
self.event_queues and, if so, log an error (using logger.error) and raise an
explicit exception (e.g., ValueError or RuntimeError) instead of creating a new
queue, otherwise proceed to create and assign the queue as before; reference the
create_queue function and the self.event_queues dict when making this change.
- Around line 60-64: The captured variable loop (set via
asyncio.get_running_loop() at callback-creation) is dead code because the
callback re-queries asyncio.get_running_loop() at invocation and overwrites
running_loop; fix by removing the pre-capture and always attempt
asyncio.get_running_loop() at invocation, or if you want to support the callback
firing off-thread keep the captured loop and change the invocation path to use
loop.call_soon_threadsafe(asyncio.ensure_future, coro) when
asyncio.get_running_loop() raises; update the code paths around the captured
loop, running_loop, asyncio.get_running_loop() and asyncio.ensure_future to
follow one of these two consistent strategies.

In `@wavefront/server/modules/auth_module/auth_module/auth_container.py`:
- Around line 45-52: The DI currently injects None for kms_service in
auth_container but TokenService.__init__ still types kms_service as FloKMS;
update TokenService.__init__ signature to accept FloKMS | None (or
Optional[FloKMS]) and adjust any internal usage to handle a None kms_service
safely, mirroring the floconsole contract; ensure the provider in auth_container
(the providers.Singleton(TokenService, kms_service=...)) remains compatible with
the new nullable type.
- Around line 40-45: The Selector kms_service uses raw
config.jwt_token.enable_cloud_kms keys like true/false which can fail for values
like "1","yes","True"; normalize the config value to a canonical boolean-string
before passing to providers.Selector (e.g., map "1","true","yes" -> "true" and
"0","false","no" -> "false"), then feed that normalized value into
providers.Selector so the existing branches (true ->
providers.Singleton(FloKmsService, ...), false -> providers.Object(None))
reliably resolve; update the code that constructs kms_service to use the
normalized value and keep the same branch identifiers.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 0600f713-e81a-443e-87fe-05269976cb7b

📥 Commits

Reviewing files that changed from the base of the PR and between 904e73e and 11056f3.

📒 Files selected for processing (10)
  • flo_ai/flo_ai/arium/arium.py
  • flo_ai/flo_ai/arium/events.py
  • wavefront/client/src/components/Stream.tsx
  • wavefront/client/src/types/workflow.ts
  • wavefront/server/apps/floconsole/floconsole/di/application_container.py
  • wavefront/server/modules/agents_module/agents_module/controllers/workflow_controller.py
  • wavefront/server/modules/agents_module/agents_module/models/workflow_schemas.py
  • wavefront/server/modules/agents_module/agents_module/services/workflow_events.py
  • wavefront/server/modules/agents_module/agents_module/services/workflow_inference_service.py
  • wavefront/server/modules/auth_module/auth_module/auth_container.py

Comment on lines +644 to +661
def _serialize_node_output(self, result: Any) -> Optional[str]:
if result is None:
return None
if isinstance(result, str):
return result
if isinstance(result, list):
parts = [self._serialize_node_output(item) for item in result]
return '\n'.join(p for p in parts if p) or None
if hasattr(result, 'content'):
content = result.content
if isinstance(content, str):
return content
if hasattr(content, 'text'):
return content.text
return str(content)
if hasattr(result, 'text'):
return result.text
return str(result)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

_serialize_node_output may return non-string values, violating the declared return type.

The signature declares Optional[str], but several branches can return a non-str:

  • Line 657: return content.texttext is not guaranteed to be a str on arbitrary objects.
  • Line 660: return result.text — same concern.

If a downstream consumer (e.g., WorkflowEventMessage.node_output: Optional[str] in workflow_schemas.py) calls json.dumps(event_data) or Pydantic validation on it, a non-string attribute could raise at runtime or silently propagate unexpected types into event payloads.

🛡️ Proposed coercion + brief docstring
     def _serialize_node_output(self, result: Any) -> Optional[str]:
+        """Serialize a node result to a string for event telemetry, or None if empty."""
         if result is None:
             return None
         if isinstance(result, str):
             return result
         if isinstance(result, list):
             parts = [self._serialize_node_output(item) for item in result]
             return '\n'.join(p for p in parts if p) or None
         if hasattr(result, 'content'):
             content = result.content
             if isinstance(content, str):
                 return content
             if hasattr(content, 'text'):
-                return content.text
+                return str(content.text) if content.text is not None else None
             return str(content)
         if hasattr(result, 'text'):
-            return result.text
+            return str(result.text) if result.text is not None else None
         return str(result)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flo_ai/flo_ai/arium/arium.py` around lines 644 - 661, The
_serialize_node_output function can return non-str types from attributes like
content.text and result.text, violating the declared Optional[str] return;
update _serialize_node_output to coerce any non-None, non-str branch to a string
(e.g., wrap content.text and result.text with str(...) and ensure list
comprehension results are coerced to strings before joining) while preserving
None propagation, and add a short docstring noting the coercion; target the
function named _serialize_node_output and the branches handling list,
hasattr(result, 'content') (content.text) and hasattr(result, 'text')
(result.text).

Comment on lines +69 to +74
kms_service = providers.Selector(
config.jwt_token.enable_cloud_kms,
true=providers.Singleton(
FloKmsService, cloud_provider=config.cloud_config.cloud_provider
),
false=providers.Object(None),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

python - <<'PY'
try:
    from dependency_injector import providers
except ModuleNotFoundError:
    print("dependency_injector is not installed in this sandbox; run this in the project environment.")
    raise SystemExit(0)

for selector_value in (True, False, "true", "false", "True", "False", "1", "0"):
    selector = providers.Selector(
        providers.Object(selector_value),
        true=providers.Object("kms"),
        false=providers.Object(None),
    )
    try:
        print(f"{selector_value!r} -> {selector()!r}")
    except Exception as exc:
        print(f"{selector_value!r} -> ERROR: {type(exc).__name__}: {exc}")
PY

rg -n -C2 'enable_cloud_kms|kms_service = providers\.Selector' .

Repository: rootflo/wavefront

Length of output: 2402


🏁 Script executed:

#!/bin/bash

# Find config loading logic
echo "=== Searching for config loading and enable_cloud_kms handling ==="
rg -n 'enable_cloud_kms|jwt_token|pydantic' --type py wavefront/server/apps/floconsole/floconsole/ | head -30

# Check the config schema/model definition
echo -e "\n=== Looking for config model definitions ==="
fd -e py 'config' wavefront/server/apps/floconsole/ | xargs grep -l 'enable_cloud_kms' 2>/dev/null | head -5

Repository: rootflo/wavefront

Length of output: 1897


🏁 Script executed:

#!/bin/bash

# Find how config is parsed - look for pydantic models or config classes
echo "=== Searching for config model/class definitions ==="
rg -A5 'class.*Config|enable_cloud_kms' wavefront/server/apps/floconsole/floconsole/ -t py | head -50

# Check if jwt_token is a pydantic model
echo -e "\n=== Finding jwt_token class/model ==="
rg -B2 -A10 'class.*JwtToken|jwt_token.*=' wavefront/server/apps/floconsole/ -t py | head -60

Repository: rootflo/wavefront

Length of output: 1265


🏁 Script executed:

#!/bin/bash

# Look for tests that might show actual behavior with Selector
echo "=== Searching for tests of application_container ==="
fd -e py test wavefront/server/apps/floconsole/ 2>/dev/null | xargs grep -l 'Selector\|kms_service' 2>/dev/null

# Check auth_container for comparison
echo -e "\n=== Checking auth_container context ==="
cat -n wavefront/server/modules/auth_module/auth_module/auth_container.py | sed -n '35,50p'

# Check if there are any error logs or GitHub issues mentioning this
echo -e "\n=== Checking for any FloKmsService error handling ==="
rg -B3 -A3 'FloKmsService|kms_service' wavefront/server/apps/floconsole/floconsole/services/ -t py

Repository: rootflo/wavefront

Length of output: 5656


🏁 Script executed:

#!/bin/bash

# Look for the config initialization in application_container
echo "=== Full application_container.py to see config setup ==="
cat -n wavefront/server/apps/floconsole/floconsole/di/application_container.py | head -80

# Search for config class/model definition
echo -e "\n=== Searching for config model in floconsole ==="
fd -e py wavefront/server/apps/floconsole/floconsole/ | xargs grep -l 'class.*Config\|JwtToken' 2>/dev/null

# Check imports in application_container
echo -e "\n=== Checking imports in application_container ==="
head -30 wavefront/server/apps/floconsole/floconsole/di/application_container.py

Repository: rootflo/wavefront

Length of output: 4758


🏁 Script executed:

#!/bin/bash

# Look for config models more broadly
echo "=== Searching for JwtToken/Config class definitions ==="
rg 'class JwtToken|class.*Config.*:' wavefront/server/ -t py -B1 -A10 | head -80

# Check how config is instantiated
echo -e "\n=== Searching for config = ... or load_config ==="
rg 'config.*=|load_config|from_env|Config\(' wavefront/server/apps/floconsole/floconsole/di/ -t py -B2 -A2

Repository: rootflo/wavefront

Length of output: 10148


🏁 Script executed:

#!/bin/bash

# Look for the actual config definition module
echo "=== Checking for config module in floconsole ==="
fd -e py -path '*/floconsole/*' 'config' 2>/dev/null | head -20

# Try searching for config class with enable_cloud_kms field
echo -e "\n=== Searching for enable_cloud_kms field definition ==="
rg -B5 -A5 'enable_cloud_kms.*:' wavefront/server/ -t py

# Check the actual yaml/ini/toml config files
echo -e "\n=== Checking config files ==="
find wavefront/server/apps/floconsole -name '*.ini' -o -name '*.yaml' -o -name '*.yml' -o -name '*.toml' | xargs cat -n 2>/dev/null | grep -C3 'enable_cloud_kms'

Repository: rootflo/wavefront

Length of output: 541


Normalize the selector value to ensure it matches "true" or "false" keys.

Line 70: config.jwt_token.enable_cloud_kms is a string value from the INI configuration (parsed from the ${ENABLE_CLOUD_KMS} environment variable). The providers.Selector performs exact key matching on strings. If the environment variable is set to "True", "1", "yes", or any non-standard format, the selector will fail to match either the true or false branch, causing runtime initialization failure.

Apply the same normalization used in the auth container:

Proposed normalization
     kms_service = providers.Selector(
-        config.jwt_token.enable_cloud_kms,
+        providers.Callable(
+            lambda enabled: (
+                'true'
+                if str(enabled).strip().lower() in {'1', 'true', 'yes', 'on'}
+                else 'false'
+            ),
+            config.jwt_token.enable_cloud_kms,
+        ),
         true=providers.Singleton(
             FloKmsService, cloud_provider=config.cloud_config.cloud_provider
         ),
         false=providers.Object(None),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
kms_service = providers.Selector(
config.jwt_token.enable_cloud_kms,
true=providers.Singleton(
FloKmsService, cloud_provider=config.cloud_config.cloud_provider
),
false=providers.Object(None),
kms_service = providers.Selector(
providers.Callable(
lambda enabled: (
'true'
if str(enabled).strip().lower() in {'1', 'true', 'yes', 'on'}
else 'false'
),
config.jwt_token.enable_cloud_kms,
),
true=providers.Singleton(
FloKmsService, cloud_provider=config.cloud_config.cloud_provider
),
false=providers.Object(None),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/apps/floconsole/floconsole/di/application_container.py`
around lines 69 - 74, The selector for kms_service uses
config.jwt_token.enable_cloud_kms (a string) and may not match "true"/"false"
exactly; normalize that value the same way used in the auth container before
passing it to providers.Selector — e.g., convert
config.jwt_token.enable_cloud_kms to a canonical "true" or "false" (handle
"True","1","yes","on" -> "true" and others -> "false") and then call
providers.Selector with the normalized value so the true branch
(providers.Singleton(FloKmsService, ...)) and false branch
(providers.Object(None)) reliably resolve.

Comment on lines +86 to +95
if running_loop is not None:
asyncio.ensure_future(
event_streamer.add_event(execution_id, event_message),
loop=running_loop,
)
else:
logger.warning(
f'No event loop available to queue event {event.event_type.value} '
f'for execution {execution_id}'
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🌐 Web query:

Python asyncio ensure_future task garbage collection weakref

💡 Result:

In Python's asyncio, tasks created with asyncio.ensure_future or asyncio.create_task are only held by weak references in the event loop (specifically, in _all_tasks as a WeakSet). This design prevents memory leaks from long-lived references to completed tasks in fire-and-forget scenarios but requires users to maintain strong references to ensure tasks complete reliably. If no strong reference exists (e.g., not storing the task object), the task can be garbage collected prematurely if it's not actively executing or blocked on a strongly referenced future. This triggers the warning "Task was destroyed but it is pending!" and stops execution mid-way. The event loop tracks tasks weakly via weakref.WeakSet to allow GC when no strong refs remain. To prevent premature GC: - Store tasks in a set/list: background_tasks = set; task = asyncio.create_task(coro); background_tasks.add(task); task.add_done_callback(background_tasks.discard) - Use asyncio.TaskGroup (Python 3.11+) for structured concurrency, which holds strong refs internally. - For ensure_future specifically (deprecated in favor of create_task), same rules apply. Official docs emphasize: "Save a reference to the result of this function, to avoid a task disappearing mid-execution. The event loop only keeps weak references to tasks." This behavior is consistent across Python 3.10+ (current as of 2026), with source code confirming WeakSet usage in tasks.py.

Citations:


🏁 Script executed:

find . -name "workflow_events.py" -type f

Repository: rootflo/wavefront

Length of output: 143


🏁 Script executed:

# Once found, examine the file structure
wc -l wavefront/server/modules/agents_module/agents_module/services/workflow_events.py

Repository: rootflo/wavefront

Length of output: 145


🏁 Script executed:

# Read the relevant section and surrounding context
cat -n wavefront/server/modules/agents_module/agents_module/services/workflow_events.py | sed -n '1,110p'

Repository: rootflo/wavefront

Length of output: 4192


asyncio.ensure_future(...) return value is discarded — task may be garbage-collected before completion.

Per Python docs and RUF006, asyncio only keeps weak references to tasks; a task created via ensure_future without a strong reference can be garbage-collected mid-execution, silently dropping events under memory pressure. This is especially problematic in the streaming inference path where event loss is user-visible.

🛡️ Proposed fix — track pending tasks

Add a set to store task references and discard on completion:

 class WorkflowEventStreamer:
     """Manager for HTTP streaming workflow events, isolated per execution."""

     def __init__(self):
         self.event_queues: Dict[str, asyncio.Queue] = {}
+        self._pending_tasks: set[asyncio.Task] = set()

Then in the callback within create_workflow_event_callback (lines 86-90):

             if running_loop is not None:
-                asyncio.ensure_future(
+                task = asyncio.ensure_future(
                     event_streamer.add_event(execution_id, event_message),
                     loop=running_loop,
                 )
+                event_streamer._pending_tasks.add(task)
+                task.add_done_callback(event_streamer._pending_tasks.discard)
🧰 Tools
🪛 Ruff (0.15.10)

[warning] 87-90: Store a reference to the return value of asyncio.ensure_future

(RUF006)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/agents_module/agents_module/services/workflow_events.py`
around lines 86 - 95, The call to asyncio.ensure_future in
create_workflow_event_callback discards the Task and can be garbage-collected;
fix by keeping a strong reference: add a module- or instance-level set (e.g.,
pending_event_tasks) to store Tasks created for event_streamer.add_event, assign
the result of asyncio.ensure_future(...) to a variable, add a done callback that
removes the Task from pending_event_tasks when complete, and insert the Task
into pending_event_tasks only when running_loop is not None (use the same
ensure_future call with loop=running_loop). Ensure references to running_loop,
event_streamer.add_event, and create_workflow_event_callback are updated to use
this tracking set so tasks are not dropped.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
wavefront/server/apps/floware/floware/server.py (1)

574-583: ⚠️ Potential issue | 🟠 Major

Keep Floware’s local port aligned with existing consumers.

Line 577 moves the dev server to 8004, but Floware is still referenced as localhost:8001 by floware_base_url, docker-compose.sample.yml, the local-app client default, and the security-header test docs. 8004 is also already used by call_processing, so running both locally will conflict.

Either keep Floware on 8001, or make the port configurable and update all references together.

🐛 Minimal fix: restore the existing Floware dev port
         uvicorn.run(
             'server:app',
             host='0.0.0.0',
-            port=8004,
+            port=8001,
             workers=1,
             reload=True,
             reload_includes=dirs,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/apps/floware/floware/server.py` around lines 574 - 583, The
dev server port was changed to 8004 in the uvicorn.run call which conflicts with
call_processing and is inconsistent with other references (floware_base_url,
docker-compose.sample.yml, local-app client default, security-header tests);
revert the port argument in uvicorn.run back to 8001 (or alternatively make the
port configurable via an env var and update all references) so Floware’s local
port aligns with existing consumers—update the uvicorn.run(...) call in
server.py (and if choosing configurability, add env var handling and replace
hard-coded 8004 with that variable and update all referenced configs).
🧹 Nitpick comments (1)
wavefront/client/src/pages/apps/[appId]/workflows/[id].tsx (1)

390-399: Redundant file_name alongside metadata.filename.

file_name and metadata.filename carry the same value from doc.file.name. The server only reads the top-level file_name (see input_processing_utils.py line 80), so metadata.filename is effectively dead weight on the wire. Not blocking — keeping both matches the pattern in agents/[id].tsx and InferencePopup.tsx — but worth consolidating across all three client sites in a follow-up to avoid future drift between the two fields.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/client/src/pages/apps/`[appId]/workflows/[id].tsx around lines 390
- 399, The payload includes duplicate filename fields: top-level file_name and
metadata.filename both set from doc.file.name; remove metadata.filename from the
documentMessage object (leave other metadata entries like size) so the client
only sends the server-consumed top-level file_name, and update any related
construction using documentMessage/doc.file.name to avoid reintroducing the
duplicate; apply the same consolidation later in the other places that build
documentMessage to keep consistency.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@wavefront/server/modules/agents_module/agents_module/utils/input_processing_utils.py`:
- Around line 78-88: The code injects file_name from the client directly into a
UserMessage (see input_processing_utils.py where file_name is used to build a
TextMessageContent appended to resolved_inputs), which is an in-band prompt
injection risk; fix it by sanitizing file_name before use: strip control
characters and newlines, limit length (e.g. truncate to a safe max), and wrap
the sanitized value in explicit delimiters (e.g. quotes or backticks) before
embedding; alternatively move the sanitized filename into DocumentMessageContent
metadata instead of a UserMessage so it’s not treated as free-form instruction —
update the logic that constructs the UserMessage/TextMessageContent to use the
sanitized/truncated/quoted value or to attach it as metadata.
- Around line 89-97: The code is currently always setting
url=input_content.get('document_url') when building DocumentMessageContent,
which allows URL-only inputs to flow to providers; change the logic in the block
that appends to resolved_inputs (the UserMessage/DocumentMessageContent
construction using input_content) to require and validate that either
document_base64 or bytes is present before accepting a document, and only
populate the url field conditionally (e.g., set url only if
document_base64/bytes are absent and you explicitly allow URL-fallback), or else
reject/raise/log the input; update the validation so downstream provider
formatters never receive a URL unless explicit fallback is intended.

---

Outside diff comments:
In `@wavefront/server/apps/floware/floware/server.py`:
- Around line 574-583: The dev server port was changed to 8004 in the
uvicorn.run call which conflicts with call_processing and is inconsistent with
other references (floware_base_url, docker-compose.sample.yml, local-app client
default, security-header tests); revert the port argument in uvicorn.run back to
8001 (or alternatively make the port configurable via an env var and update all
references) so Floware’s local port aligns with existing consumers—update the
uvicorn.run(...) call in server.py (and if choosing configurability, add env var
handling and replace hard-coded 8004 with that variable and update all
referenced configs).

---

Nitpick comments:
In `@wavefront/client/src/pages/apps/`[appId]/workflows/[id].tsx:
- Around line 390-399: The payload includes duplicate filename fields: top-level
file_name and metadata.filename both set from doc.file.name; remove
metadata.filename from the documentMessage object (leave other metadata entries
like size) so the client only sends the server-consumed top-level file_name, and
update any related construction using documentMessage/doc.file.name to avoid
reintroducing the duplicate; apply the same consolidation later in the other
places that build documentMessage to keep consistency.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: a592ad7f-fee7-4b93-bcf1-badb657802fd

📥 Commits

Reviewing files that changed from the base of the PR and between 11056f3 and 8a5b719.

📒 Files selected for processing (7)
  • flo_ai/flo_ai/arium/arium.py
  • wavefront/client/src/components/InferencePopup.tsx
  • wavefront/client/src/pages/apps/[appId]/agents/[id].tsx
  • wavefront/client/src/pages/apps/[appId]/workflows/[id].tsx
  • wavefront/client/src/types/chat-message.ts
  • wavefront/server/apps/floware/floware/server.py
  • wavefront/server/modules/agents_module/agents_module/utils/input_processing_utils.py
✅ Files skipped from review due to trivial changes (3)
  • wavefront/client/src/pages/apps/[appId]/agents/[id].tsx
  • wavefront/client/src/types/chat-message.ts
  • wavefront/client/src/components/InferencePopup.tsx
🚧 Files skipped from review as they are similar to previous changes (1)
  • flo_ai/flo_ai/arium/arium.py

Comment on lines +78 to +88
# Inject filename as a text message before the document so
# agents can reference the original file name in their output.
file_name = input_content.get('file_name')
if file_name:
resolved_inputs.append(
UserMessage(
content=TextMessageContent(
text=f'The original filename of this document is: {file_name}'
)
)
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Filename is injected verbatim into the prompt — consider sanitizing.

file_name comes straight from the client payload (doc.file.name) and is concatenated into a natural-language instruction sent to the model. A crafted filename like ignore previous instructions.\n\nNew task: ...pdf becomes an in-band prompt-injection vector that now flows into every agent invocation that receives a document.

At minimum, consider:

  • stripping newlines/control chars and truncating length,
  • quoting the value (e.g. wrapping in backticks or "…") so it's visually delimited from the instruction,
  • and/or attaching it as metadata on DocumentMessageContent rather than a separate UserMessage, if the downstream model prompt can surface it from there.
Suggested minimal hardening
-                    file_name = input_content.get('file_name')
-                    if file_name:
+                    file_name = input_content.get('file_name')
+                    if file_name:
+                        # Guard against prompt-injection via crafted filenames.
+                        safe_name = re.sub(r'[\r\n\t]+', ' ', str(file_name))[:255]
                         resolved_inputs.append(
                             UserMessage(
                                 content=TextMessageContent(
-                                    text=f'The original filename of this document is: {file_name}'
+                                    text=f'The original filename of this document is: "{safe_name}"'
                                 )
                             )
                         )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/agents_module/agents_module/utils/input_processing_utils.py`
around lines 78 - 88, The code injects file_name from the client directly into a
UserMessage (see input_processing_utils.py where file_name is used to build a
TextMessageContent appended to resolved_inputs), which is an in-band prompt
injection risk; fix it by sanitizing file_name before use: strip control
characters and newlines, limit length (e.g. truncate to a safe max), and wrap
the sanitized value in explicit delimiters (e.g. quotes or backticks) before
embedding; alternatively move the sanitized filename into DocumentMessageContent
metadata instead of a UserMessage so it’s not treated as free-form instruction —
update the logic that constructs the UserMessage/TextMessageContent to use the
sanitized/truncated/quoted value or to attach it as metadata.

Comment on lines 89 to 97
resolved_inputs.append(
UserMessage(
content=DocumentMessageContent(
base64=input_content.get('document_base64'),
mime_type=input_content.get('mime_type'),
url=input_content.get('document_url'),
)
)
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Inspect DocumentMessageContent formatting logic to see how url vs base64 precedence is resolved.
fd -t f 'chat_message.py' | xargs rg -n -C3 'class DocumentMessageContent|_formatted_cache|def format|url|base64'
rg -nP -C3 'DocumentMessageContent\s*\(' --type=py

Repository: rootflo/wavefront

Length of output: 7469


🏁 Script executed:

# Search for provider formatters that handle DocumentMessageContent
rg -n 'class.*Provider|def.*format.*content|DocumentMessageContent' --type=py -A5 | head -150

Repository: rootflo/wavefront

Length of output: 15359


🏁 Script executed:

# Look for any formatting or message handling logic specific to DocumentMessageContent
fd -t f -e py | xargs rg -l 'DocumentMessageContent' | head -20

Repository: rootflo/wavefront

Length of output: 599


🏁 Script executed:

# Search for any validation or precedence logic
rg -n 'base64.*url|url.*base64' --type=py -C2

Repository: rootflo/wavefront

Length of output: 15748


🏁 Script executed:

# Get complete format_document_in_message methods from LLM providers
rg -n 'def format_document_in_message' -A30 --type=py

Repository: rootflo/wavefront

Length of output: 7742


🏁 Script executed:

# Check Anthropic's document handling specifically
sed -n '250,310p' flo_ai/flo_ai/llm/anthropic_llm.py

Repository: rootflo/wavefront

Length of output: 2259


🏁 Script executed:

# Check Gemini's document handling
sed -n '275,295p' flo_ai/flo_ai/llm/gemini_llm.py

Repository: rootflo/wavefront

Length of output: 829


🏁 Script executed:

# Check base LLM document handling  
sed -n '200,220p' flo_ai/flo_ai/llm/base_llm.py

Repository: rootflo/wavefront

Length of output: 742


Verify that document_url is intended as a fallback source for documents.

The code unconditionally wires in url=input_content.get('document_url') alongside base64=input_content.get('document_base64'). While downstream provider formatters correctly prioritize base64 over url when both are present (Anthropic and Gemini check base64 first; base formatter rejects urls entirely with "URL-based documents are not supported"), the concern is if only document_url is provided without document_base64, providers will attempt to fetch from that URL. This could expose the system to stale URLs or unintended remote fetches if client sends an outdated URL. Consider whether the code should validate that at least one of document_base64 or bytes is present, or if document_url should only be set conditionally.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@wavefront/server/modules/agents_module/agents_module/utils/input_processing_utils.py`
around lines 89 - 97, The code is currently always setting
url=input_content.get('document_url') when building DocumentMessageContent,
which allows URL-only inputs to flow to providers; change the logic in the block
that appends to resolved_inputs (the UserMessage/DocumentMessageContent
construction using input_content) to require and validate that either
document_base64 or bytes is present before accepting a document, and only
populate the url field conditionally (e.g., set url only if
document_base64/bytes are absent and you explicitly allow URL-fallback), or else
reject/raise/log the input; update the validation so downstream provider
formatters never receive a URL unless explicit fallback is intended.

vishnurk6247
vishnurk6247 previously approved these changes Apr 23, 2026
@vizsatiz vizsatiz merged commit b57f780 into develop Apr 23, 2026
8 of 9 checks passed
@vizsatiz vizsatiz deleted the feature/output_in_events branch April 23, 2026 09:30
thomastomy5 pushed a commit that referenced this pull request Apr 27, 2026
* Output in events

* Reverting port chaneg

* fix code review

* Document name fix

* fix for code review comments

* fix port issues
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants