Skip to content

fix: memory access in arium - foreach node and arium node#167

Merged
vishnurk6247 merged 4 commits into
developfrom
fix/for-each-node-memory
Nov 22, 2025
Merged

fix: memory access in arium - foreach node and arium node#167
vishnurk6247 merged 4 commits into
developfrom
fix/for-each-node-memory

Conversation

@vishnurk6247
Copy link
Copy Markdown
Member

@vishnurk6247 vishnurk6247 commented Nov 22, 2025

Summary by CodeRabbit

  • Refactor

    • Standardized message and memory handling across routing and execution; routers now use message-based memory and asynchronous patterns, and node execution normalizes and flattens message results.
  • New Features

    • Public SystemMessage type added for explicit system prompts.
    • Memory items now require message objects (plain string results removed).
  • Tests

    • Updated tests to validate async router return types alongside existing checks.
  • Chores

    • Bumped release version to 1.1.0-rc3.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Nov 22, 2025

Walkthrough

Arium node execution now normalizes mixed node outputs via a new flattener and wraps resolved input strings as UserMessage; routers switched to MessageMemory/MessageMemoryItem and async return types; agent system prompts use SystemMessage; MessageMemoryItem.result type narrowed to BaseMessage; project version bumped to 1.1.0-rc3.

Changes

Cohort / File(s) Summary
Router memory type refactor
flo_ai/flo_ai/arium/llm_router.py
Replaced BaseMemory with MessageMemory across signatures, decorators, and factories. Routers now consume List[MessageMemoryItem] from memory.get() and extract content via item.result.content. Prompt construction and truncation helpers adjusted to use List[str]. Public router callables updated to Callable[[MessageMemory, Optional[dict]], Awaitable[str]].
Arium node execution & message handling
flo_ai/flo_ai/arium/arium.py
Resolved input strings are wrapped as UserMessage(content=...). ForEachNode and AriumNode executions now await and collect results as lists of `MessageMemoryItem
Agent system prompt message type
flo_ai/flo_ai/models/agent.py
System prompts are constructed as explicit SystemMessage objects instead of AssistantMessage with a system role. SystemMessage added to imports/public surface.
MessageMemoryItem typing change
flo_ai/flo_ai/arium/memory.py
MessageMemoryItem.__init__ and self.result narrowed to accept BaseMessage only (removed str as an accepted type).
Awaitable Literal support in base router
flo_ai/flo_ai/arium/base.py
Router return-type checker updated to detect Awaitable[Literal[...]], unwrap the Awaitable, and extract literal values from the inner Literal for validation.
Tests updated for async router types
flo_ai/tests/unit-tests/test_router_fix.py
Tests extended to accept and validate Awaitable[Literal[...]] return annotations and updated error/assertion messages accordingly.
Version bump
flo_ai/pyproject.toml
Project version updated from 1.1.0-rc2 to 1.1.0-rc3.

Sequence Diagram(s)

sequenceDiagram
  participant Runner as Arium.run
  participant Node as Node (ForEach/Arium)
  participant Flattener as _flatten_results
  participant Agent as Agent.run

  Note right of Runner: resolve inputs → wrap strings\nas UserMessage(content=...)
  Runner->>Node: await node.run(resolved_inputs, variables=...)
  Node-->>Runner: List[MessageMemoryItem|BaseMessage]
  Runner->>Flattener: _flatten_results(sequence)
  Flattener-->>Runner: List[BaseMessage] (normalized)
  alt node is Agent
    Runner->>Agent: Agent.run(..., variables={})
    Agent-->>Runner: Agent result (normalized similarly)
  end
  Runner->>Runner: continue execution with normalized results
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Pay extra attention to:
    • Consistent extraction of item.result.content across routers and any edge cases for missing/structured content.
    • Correctness of _flatten_results for mixed element types and that it preserves message semantics.
    • All Arium execution branches (telemetry vs non-telemetry, agent vs non-agent) performing identical normalization.
    • Impacts of MessageMemoryItem no longer accepting str (call sites and tests).
    • Awaitable/typing changes in router decorator and base return-type validation.

Poem

🐰 I hop through changed code with delight,
Wrapping messages snug, flattening each bite.
System prompts don a gentler name,
Memories tightened, no strings remain.
Version stamped — I twitch my nose, good night.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 65.52% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the primary change: fixing memory access handling in ForEachNode and AriumNode, which is a central focus of the changeset across multiple files.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/for-each-node-memory

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c28e32e and 2cd68d9.

📒 Files selected for processing (3)
  • flo_ai/flo_ai/arium/base.py (2 hunks)
  • flo_ai/flo_ai/arium/llm_router.py (17 hunks)
  • flo_ai/tests/unit-tests/test_router_fix.py (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
flo_ai/flo_ai/arium/llm_router.py (1)
flo_ai/flo_ai/arium/memory.py (7)
  • ExecutionPlan (51-102)
  • StepStatus (12-19)
  • MessageMemory (132-152)
  • MessageMemoryItem (22-33)
  • get (111-112)
  • get (148-152)
  • get (175-182)
🔇 Additional comments (13)
flo_ai/flo_ai/arium/base.py (1)

63-77: LGTM! Async router return type handling is correct.

The unwrapping logic correctly detects Awaitable[Literal[...]] return types, extracts the inner Literal type, and returns the literal values for validation. This aligns with the async router implementations in llm_router.py.

flo_ai/tests/unit-tests/test_router_fix.py (2)

35-72: LGTM! Test correctly validates both async and sync router types.

The test logic properly handles both Awaitable[Literal[...]] (async routers) and Literal (sync routers) return type annotations, matching the implementation in base.py.


94-129: LGTM! Validation logic test is comprehensive.

The test correctly replicates the validation logic from base.py for both async and sync router return types, ensuring the type annotation extraction works as expected.

flo_ai/flo_ai/arium/llm_router.py (10)

9-16: LGTM! Imports updated correctly for MessageMemory integration.

The imports include Awaitable for async return types and MessageMemory/MessageMemoryItem for the new memory model.


63-68: LGTM! Abstract method signatures updated consistently.

The base class correctly defines get_routing_prompt and route to accept MessageMemory instead of generic memory, ensuring all implementations follow the new memory model.


186-196: LGTM! Memory access pattern is correct.

The method correctly retrieves MessageMemoryItem instances from MessageMemory, accesses item.result.content (safe since result is typed as BaseMessage), and formats the conversation history. The list comprehension produces a List[str] which matches the updated _truncate_conversation_for_tokens signature.


248-274: LGTM! Token truncation method signature corrected.

The method now correctly accepts List[str] instead of List[Any], matching the calling pattern at line 194-196 where a list of formatted strings is passed. This resolves the previous issue where a single string was being passed and treated as a list.


311-324: LGTM! TaskClassifierRouter correctly accesses MessageMemory.

The router properly retrieves conversation history from MessageMemory and accesses result.content on the latest MessageMemoryItem to extract the task description.


447-461: LGTM! ReflectionRouter memory access is correct.

The router correctly retrieves recent conversation history and formats it by accessing result.content on each MessageMemoryItem.


577-591: LGTM! PlanExecuteRouter memory handling is correct.

The router properly integrates with MessageMemory and safely accesses result.content on conversation items.


762-780: LGTM! ConversationAnalysisRouter memory access is consistent.

The router correctly retrieves and formats conversation history using the MessageMemory API, safely accessing result.content on each item.


832-961: LGTM! Router factory and annotations are correct.

The factory function correctly:

  1. Creates async router functions with proper async def syntax
  2. Annotates the return type as Awaitable[literal_type] for type checking
  3. Returns a properly typed Callable[[MessageMemory, Optional[dict]], Awaitable[str]]

This addresses the previous review concerns about async return type annotations.


1041-1191: LGTM! Convenience router functions have consistent async signatures.

All convenience functions (create_research_analysis_router, create_main_critic_reflection_router, create_plan_execute_router, and create_main_critic_flow_router) correctly return Callable[[MessageMemory, Optional[dict]], Awaitable[str]], maintaining consistency with the core router factory.


Comment @coderabbitai help to get the list of available commands and usage tips.

vizsatiz
vizsatiz previously approved these changes Nov 22, 2025
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
flo_ai/flo_ai/arium/arium.py (1)

474-485: Foreach/Arium node result normalization correctly flattens MessageMemoryItem

The new handling for ForEachNode and AriumNode:

  • Awaits node.run(...) into foreach_results / arium_result.
  • Maps each element to item.result when it is a MessageMemoryItem, otherwise leaves it as-is.
  • Returns a plain List[BaseMessage | str] in both traced and non-traced paths.

This removes nested MessageMemoryItem layers and ensures downstream memory writes (_execute_graph’s _add_to_memory) store leaf results, which is exactly what the routers and other consumers expect.

You might consider extracting the normalization into a small helper (e.g., _flatten_results(sequence)) to deduplicate the foreach/arium logic, but that’s purely a readability win, not required for correctness.

Also applies to: 488-496, 571-580, 582-588

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5b0bbc3 and e358886.

📒 Files selected for processing (4)
  • flo_ai/flo_ai/arium/arium.py (3 hunks)
  • flo_ai/flo_ai/arium/llm_router.py (16 hunks)
  • flo_ai/flo_ai/models/agent.py (4 hunks)
  • flo_ai/pyproject.toml (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
flo_ai/flo_ai/models/agent.py (1)
flo_ai/flo_ai/models/chat_message.py (2)
  • SystemMessage (56-61)
  • TextMessageContent (43-45)
flo_ai/flo_ai/arium/arium.py (4)
flo_ai/flo_ai/models/chat_message.py (2)
  • UserMessage (65-70)
  • BaseMessage (49-52)
flo_ai/flo_ai/utils/variable_extractor.py (1)
  • resolve_variables (110-139)
flo_ai/flo_ai/arium/memory.py (1)
  • MessageMemoryItem (22-35)
flo_ai/flo_ai/arium/nodes.py (4)
  • run (35-50)
  • run (98-111)
  • run (181-208)
  • AriumNode (12-50)
flo_ai/flo_ai/arium/llm_router.py (1)
flo_ai/flo_ai/arium/memory.py (7)
  • ExecutionPlan (53-104)
  • StepStatus (12-19)
  • MessageMemory (134-154)
  • MessageMemoryItem (22-35)
  • get (113-114)
  • get (150-154)
  • get (177-184)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Analyze (actions)
🔇 Additional comments (4)
flo_ai/pyproject.toml (1)

3-3: Version bump aligns with PR scope

Updating the project version to 1.1.0-rc3 is consistent with the internal changes; nothing else in this file needs adjustment.

flo_ai/flo_ai/models/agent.py (1)

223-224: SystemMessage usage in tool paths looks correct

In _run_with_tools and the final-answer path, SystemMessage is constructed with a plain string (content=system_content / constant string), matching the SystemMessage definition and intended usage.

Also applies to: 401-404

flo_ai/flo_ai/arium/arium.py (1)

53-53: String input normalization into UserMessage is sound

Wrapping raw inputs: str as UserMessage(content=resolve_variables(inputs, variables)) cleanly reuses the variable resolver and ensures memory always sees a proper UserMessage as the first “input” node. Given resolve_variables is a no-op when variables is falsy, this is safe for both templated and plain inputs.

flo_ai/flo_ai/arium/llm_router.py (1)

834-852: Router factory and convenience API now consistently typed for MessageMemory

The migration of:

  • create_llm_router to return Callable[[MessageMemory], str],
  • the decorated router wrapper to accept memory: MessageMemory,
  • and all convenience creators (create_research_analysis_router, create_main_critic_reflection_router, create_plan_execute_router, create_main_critic_flow_router) to advertise Callable[[MessageMemory], str]

is consistent with the new MessageMemory-based architecture and with how Arium._execute_graph calls router functions.

Type annotations and doc examples (e.g., def my_smart_router(memory: MessageMemory) -> Literal[...]) now align with runtime behavior; no additional changes needed here.

Also applies to: 946-952, 985-986, 1039-1044, 1112-1119, 1140-1147, 1182-1189

Comment thread flo_ai/flo_ai/arium/llm_router.py
Comment thread flo_ai/flo_ai/arium/llm_router.py
Comment thread flo_ai/flo_ai/models/agent.py Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
flo_ai/flo_ai/arium/llm_router.py (1)

192-196: Guard against MessageMemoryItem.result not having .content and centralize text extraction

All router prompts now derive text with expressions like:

  • f'{item.node}: {item.result.content}' (SmartRouter)
  • latest_task = str(conversation[-1].result.content) (TaskClassifierRouter)
  • '\n'.join([msg.result.content for msg in conversation[-3:]]) (ReflectionRouter, PlanExecuteRouter)
  • f'Message {i+1}: {msg.result.content}' (ConversationAnalysisRouter)

But MessageMemoryItem.result is annotated as BaseMessage | str in memory.py, and upstream Arium logic can legitimately store plain strings as results. Even when result is a BaseMessage, its .content may be a TextMessageContent or other structured payload, so directly interpolating .content can either raise AttributeError (if result is str) or produce an unhelpful repr.

This is the same underlying concern raised in a previous review; with the broader MessageMemory adoption it’s now more important to harden. I’d strongly recommend introducing a single helper to safely stringify a MessageMemoryItem and reuse it across all routers, for example:

-from flo_ai.arium.memory import (
-    ExecutionPlan,
-    StepStatus,
-    MessageMemory,
-    MessageMemoryItem,
-)
+from flo_ai.arium.memory import (
+    ExecutionPlan,
+    StepStatus,
+    MessageMemory,
+    MessageMemoryItem,
+)
+from flo_ai.models.chat_message import BaseMessage, TextMessageContent
+
+
+def _memory_item_to_text(item: MessageMemoryItem) -> str:
+    """Extract readable text from a MessageMemoryItem.result."""
+    value = item.result
+    if isinstance(value, BaseMessage):
+        content = getattr(value, "content", "")
+        if isinstance(content, TextMessageContent):
+            return content.text
+        return str(content)
+    return str(value)

Then use it in all prompt builders, e.g.:

-        conversation_text = self._truncate_conversation_for_tokens(
-            [f'{item.node}: {item.result.content}' for item in conversation[-5:]]
-        )
+        conversation_text = self._truncate_conversation_for_tokens(
+            [f"{item.node}: {_memory_item_to_text(item)}" for item in conversation[-5:]]
+        )
-            latest_task = str(conversation[-1].result.content)
+            latest_task = _memory_item_to_text(conversation[-1])
-            conversation_text = '\n'.join(
-                [msg.result.content for msg in conversation[-3:]]
-            )
+            conversation_text = "\n".join(
+                _memory_item_to_text(msg) for msg in conversation[-3:]
+            )
-            conversation_text = '\n'.join(
-                [
-                    f'Message {i+1}: {msg.result.content}'
-                    for i, msg in enumerate(recent_messages)
-                ]
-            )
+            conversation_text = "\n".join(
+                f"Message {i+1}: {_memory_item_to_text(msg)}"
+                for i, msg in enumerate(recent_messages)
+            )

Alternatively, if your invariant truly is “result is always a BaseMessage”, it would be safer to tighten MessageMemoryItem.result’s type to BaseMessage so type checkers (and future maintainers) can rely on it.

Because this touches all routing paths, a latent str result would turn into runtime AttributeError in multiple routers, so I’d treat hardening here as fairly high priority.

#!/bin/bash
# Verify what types are actually being stored in MessageMemoryItem.result.
echo "=== MessageMemoryItem initializations ==="
rg -n "MessageMemoryItem\(" -S flo_ai | sed -n '1,120p'

echo
echo "=== Calls to MessageMemory.add ==="
rg -n "\.add\(" flo_ai/flo_ai/arium -S -A2 -B2

echo
echo "=== Direct assignments to .result (if any) ==="
rg -n "\.result\s*=" flo_ai -S -A1 -B1

Also applies to: 248-249, 317-322, 453-459, 583-589, 768-777

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e358886 and 049c01a.

📒 Files selected for processing (3)
  • flo_ai/flo_ai/arium/arium.py (4 hunks)
  • flo_ai/flo_ai/arium/llm_router.py (17 hunks)
  • flo_ai/flo_ai/models/agent.py (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • flo_ai/flo_ai/arium/arium.py
🧰 Additional context used
🧬 Code graph analysis (2)
flo_ai/flo_ai/models/agent.py (1)
flo_ai/flo_ai/models/chat_message.py (1)
  • SystemMessage (56-61)
flo_ai/flo_ai/arium/llm_router.py (1)
flo_ai/flo_ai/arium/memory.py (7)
  • ExecutionPlan (53-104)
  • StepStatus (12-19)
  • MessageMemory (134-154)
  • MessageMemoryItem (22-35)
  • get (113-114)
  • get (150-154)
  • get (177-184)
🔇 Additional comments (1)
flo_ai/flo_ai/models/agent.py (1)

12-12: SystemMessage usage now matches its type and unifies system prompts

Importing SystemMessage and using it with plain string content for both the main system prompt and the final “provide a final answer” instruction is consistent with SystemMessage’s content: str definition and cleans up the previous AssistantMessage/TextMessageContent mismatch. No further changes needed here.

Also applies to: 143-143, 221-221, 400-402

Comment thread flo_ai/flo_ai/arium/llm_router.py
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
flo_ai/flo_ai/arium/llm_router.py (1)

248-274: Minor: Redundant str() calls in truncation logic.

The method now correctly accepts List[str], but lines 260, 264, and 270 apply str(msg) to elements that are already strings. This is harmless but redundant.

Apply this diff to remove redundant conversions:

     if not messages:
         return ''
 
     # Start with the most recent message
     truncated_messages = [messages[-1]]
-    current_text = str(messages[-1])
+    current_text = messages[-1]
 
     # Add older messages if we have space
     for msg in reversed(messages[:-1]):
-        msg_text = str(msg)
+        msg_text = msg
         # Rough token estimation (4 chars per token is a common approximation)
         estimated_tokens = len(current_text + '\n' + msg_text) // 4
 
         if estimated_tokens <= max_tokens:
             truncated_messages.insert(0, msg)
-            current_text = '\n'.join([str(m) for m in truncated_messages])
+            current_text = '\n'.join(truncated_messages)
         else:
             break
 
-    return '\n'.join([str(msg) for msg in truncated_messages])
+    return '\n'.join(truncated_messages)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 049c01a and c28e32e.

📒 Files selected for processing (2)
  • flo_ai/flo_ai/arium/llm_router.py (17 hunks)
  • flo_ai/flo_ai/arium/memory.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
flo_ai/flo_ai/arium/memory.py (1)
flo_ai/flo_ai/models/chat_message.py (1)
  • BaseMessage (49-52)
flo_ai/flo_ai/arium/llm_router.py (1)
flo_ai/flo_ai/arium/memory.py (7)
  • ExecutionPlan (51-102)
  • StepStatus (12-19)
  • MessageMemory (132-152)
  • MessageMemoryItem (22-33)
  • get (111-112)
  • get (148-152)
  • get (175-182)
🔇 Additional comments (5)
flo_ai/flo_ai/arium/memory.py (1)

23-26: LGTM! Type narrowing improves type safety.

Narrowing MessageMemoryItem.result from BaseMessage | str to BaseMessage aligns with your clarification that results are always stored as BaseMessage instances. This change:

  • Eliminates potential AttributeError when accessing .result.content in routers
  • Makes the code's contract explicit
  • Improves static type checking

Based on learnings from previous review.

flo_ai/flo_ai/arium/llm_router.py (4)

9-16: LGTM! Imports updated for new memory model.

The addition of Awaitable and the switch to MessageMemory/MessageMemoryItem align with the refactored memory model and support proper async type annotations.


65-65: Signature update consistent with new memory model.

All router methods now explicitly require MessageMemory instead of the generic BaseMemory. This is a breaking change but aligns with the refactored memory architecture where routers access message-specific properties like item.result.content.


195-195: Access to .result.content now type-safe.

With MessageMemoryItem.result narrowed to BaseMessage (no longer BaseMessage | str), accessing .content throughout the routers is now safe. The previous concern about AttributeError: 'str' object has no attribute 'content' is resolved.


1043-1043: LGTM! Convenience functions properly typed.

The return type annotations for create_research_analysis_router and other convenience functions correctly declare Callable[[MessageMemory, Optional[dict]], Awaitable[str]], properly reflecting their async nature.

Comment thread flo_ai/flo_ai/arium/llm_router.py Outdated
@vishnurk6247 vishnurk6247 merged commit b1dd9b2 into develop Nov 22, 2025
6 checks passed
@vishnurk6247 vishnurk6247 deleted the fix/for-each-node-memory branch November 22, 2025 10:28
thomastomy5 pushed a commit that referenced this pull request Apr 27, 2026
* fix: memory access in arium - foreach node and arium node

* fix: resolve review comments

* fix: remove string type for result in message memory

* fix: change return type annotation of router function
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants