From 93b13b0219d66d11a8ae4cec08466e12fe131681 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Thu, 11 Sep 2025 11:36:14 +0530 Subject: [PATCH 1/4] arium event listener --- flo_ai/flo_ai/arium/arium.py | 196 +++++++++++++++++++++++++++++++--- flo_ai/flo_ai/arium/events.py | 93 ++++++++++++++++ 2 files changed, 274 insertions(+), 15 deletions(-) create mode 100644 flo_ai/flo_ai/arium/events.py diff --git a/flo_ai/flo_ai/arium/arium.py b/flo_ai/flo_ai/arium/arium.py index 2e7679f8..92bfdff8 100644 --- a/flo_ai/flo_ai/arium/arium.py +++ b/flo_ai/flo_ai/arium/arium.py @@ -1,10 +1,11 @@ from flo_ai.arium.base import BaseArium from flo_ai.arium.memory import MessageMemory, BaseMemory from flo_ai.llm.base_llm import ImageMessage -from typing import List, Dict, Any, Optional +from typing import List, Dict, Any, Optional, Callable from flo_ai.models.agent import Agent from flo_ai.tool.base_tool import Tool from flo_ai.arium.models import StartNode, EndNode +from flo_ai.arium.events import AriumEventType, AriumEvent from flo_ai.utils.logger import logger from flo_ai.utils.variable_extractor import ( extract_variables_from_inputs, @@ -13,6 +14,7 @@ resolve_variables, ) import asyncio +import time class Arium(BaseArium): @@ -29,7 +31,21 @@ async def run( self, inputs: List[str | ImageMessage], variables: Optional[Dict[str, Any]] = None, + event_callback: Optional[Callable[[AriumEvent], None]] = None, + filtered_events: Optional[List[AriumEventType]] = None, ): + """ + Execute the Arium workflow with optional event monitoring. + + Args: + inputs: Input messages for the workflow + variables: Variable substitutions for templated prompts + event_callback: Function to call for each event (if None, no events are emitted) + filtered_events: List of event types to listen for (defaults to all) + + Returns: + List of workflow execution results + """ if not self.is_compiled: raise ValueError('Arium is not compiled') @@ -39,16 +55,71 @@ async def run( if not self.nodes: raise ValueError('Arium has no nodes') - # Extract and validate variables from inputs and all agents - self._extract_and_validate_variables(inputs, variables) + # Set default filtered events to all event types if not specified + if filtered_events is None: + filtered_events = list(AriumEventType) + + # Emit workflow started event + self._emit_event( + AriumEventType.WORKFLOW_STARTED, event_callback, filtered_events + ) + + try: + # Extract and validate variables from inputs and all agents + self._extract_and_validate_variables(inputs, variables) + + # Resolve variables in inputs and agent prompts + resolved_inputs = self._resolve_inputs(inputs, variables) + self._resolve_agent_prompts(variables) + + # Execute the workflow with event support + result = await self._execute_graph( + resolved_inputs, event_callback, filtered_events + ) + + # Emit workflow completed event + self._emit_event( + AriumEventType.WORKFLOW_COMPLETED, event_callback, filtered_events + ) - # Resolve variables in inputs and agent prompts - resolved_inputs = self._resolve_inputs(inputs, variables) - self._resolve_agent_prompts(variables) + return result - return await self._execute_graph(resolved_inputs) + except Exception as e: + # Emit workflow failed event + self._emit_event( + AriumEventType.WORKFLOW_FAILED, + event_callback, + filtered_events, + error=str(e), + ) + raise - async def _execute_graph(self, inputs: List[str | ImageMessage]): + def _emit_event( + self, + event_type: AriumEventType, + callback: Optional[Callable[[AriumEvent], None]], + filtered_events: Optional[List[AriumEventType]], + **kwargs, + ) -> None: + """ + Emit an event if callback is provided and event type is in filtered list. + + Args: + event_type: The type of event to emit + callback: Function to call with the event (if None, no event is emitted) + filtered_events: List of event types to listen for + **kwargs: Additional event data (node_name, error, etc.) + """ + if callback and event_type in filtered_events: + event = AriumEvent(event_type=event_type, timestamp=time.time(), **kwargs) + callback(event) + + async def _execute_graph( + self, + inputs: List[str | ImageMessage], + event_callback: Optional[Callable[[AriumEvent], None]] = None, + filtered_events: Optional[List[AriumEventType]] = None, + ): [self.memory.add(msg) for msg in inputs] current_node = self.nodes[self.start_node_name] @@ -91,7 +162,9 @@ async def _execute_graph(self, inputs: List[str | ImageMessage]): f'Executing node: {current_node.name} (iteration {iteration_count})' ) # execute current node - result = await self._execute_node(current_node) + result = await self._execute_node( + current_node, event_callback, filtered_events + ) # update results to memory self._add_to_memory(result) @@ -120,6 +193,23 @@ async def _execute_graph(self, inputs: List[str | ImageMessage]): else: next_node_name = router_result + # Emit router decision event + self._emit_event( + AriumEventType.ROUTER_DECISION, + event_callback, + filtered_events, + node_name=current_node.name, + router_choice=next_node_name, + ) + + # Emit edge traversed event + self._emit_event( + AriumEventType.EDGE_TRAVERSED, + event_callback, + filtered_events, + node_name=current_node.name, + ) + # find next edge # TODO: next_node_name might not be in self.edges if it's the end node. Handle this case next_edge = ( @@ -203,16 +293,92 @@ def _resolve_agent_prompts(self, variables: Dict[str, Any]) -> None: node.system_prompt = resolve_variables(node.system_prompt, variables) node.resolved_variables = True - async def _execute_node(self, node: Agent | Tool | StartNode | EndNode): + async def _execute_node( + self, + node: Agent | Tool | StartNode | EndNode, + event_callback: Optional[Callable[[AriumEvent], None]] = None, + filtered_events: Optional[List[AriumEventType]] = None, + ): + """ + Execute a single node with optional event emission. + + Args: + node: The node to execute + event_callback: Function to call for events (if None, no events are emitted) + filtered_events: List of event types to listen for + + Returns: + The result of node execution + """ + # Determine node type for events if isinstance(node, Agent): - # Variables are already resolved, pass empty dict to avoid re-processing - return await node.run(self.memory.get(), variables={}) + node_type = 'agent' elif isinstance(node, Tool): - return await node.execute(self.memory.get()) + node_type = 'tool' elif isinstance(node, StartNode): - return None + node_type = 'start' elif isinstance(node, EndNode): - return None + node_type = 'end' + else: + node_type = 'unknown' + + # Emit node started event + self._emit_event( + AriumEventType.NODE_STARTED, + event_callback, + filtered_events, + node_name=node.name, + node_type=node_type, + ) + + start_time = time.time() + + try: + # Execute the node based on its type + if isinstance(node, Agent): + # Variables are already resolved, pass empty dict to avoid re-processing + result = await node.run(self.memory.get(), variables={}) + elif isinstance(node, Tool): + result = await node.execute() + elif isinstance(node, StartNode): + result = None + elif isinstance(node, EndNode): + result = None + else: + result = None + + # Calculate execution time + execution_time = time.time() - start_time + + # Emit node completed event + self._emit_event( + AriumEventType.NODE_COMPLETED, + event_callback, + filtered_events, + node_name=node.name, + node_type=node_type, + execution_time=execution_time, + ) + + return result + + except Exception as e: + # Calculate execution time even on failure + execution_time = time.time() - start_time + + # Emit node failed event + self._emit_event( + AriumEventType.NODE_FAILED, + event_callback, + filtered_events, + node_name=node.name, + node_type=node_type, + execution_time=execution_time, + error=str(e), + ) + + # Re-raise the exception + raise def _add_to_memory(self, result: str): # TODO result will be None for start and end nodes diff --git a/flo_ai/flo_ai/arium/events.py b/flo_ai/flo_ai/arium/events.py new file mode 100644 index 00000000..0ed6d972 --- /dev/null +++ b/flo_ai/flo_ai/arium/events.py @@ -0,0 +1,93 @@ +""" +Event system for Arium workflow execution monitoring. + +This module provides event types and data structures for tracking workflow execution, +including node starts/completions, router decisions, and workflow lifecycle events. +""" + +from enum import Enum +from dataclasses import dataclass +from typing import Optional +import time + + +class AriumEventType(Enum): + """Enumeration of all possible Arium workflow events.""" + + WORKFLOW_STARTED = 'workflow_started' + WORKFLOW_COMPLETED = 'workflow_completed' + WORKFLOW_FAILED = 'workflow_failed' + NODE_STARTED = 'node_started' + NODE_COMPLETED = 'node_completed' + NODE_FAILED = 'node_failed' + ROUTER_DECISION = 'router_decision' + EDGE_TRAVERSED = 'edge_traversed' + + +@dataclass +class AriumEvent: + """ + Data structure representing a single workflow execution event. + + Attributes: + event_type: The type of event that occurred + timestamp: Unix timestamp when the event occurred + node_name: Name of the node involved (if applicable) + node_type: Type of node ('agent', 'tool', 'start', 'end') + execution_time: Time taken for node execution in seconds + error: Error message if the event represents a failure + router_choice: The node chosen by a router decision + metadata: Additional event-specific data + """ + + event_type: AriumEventType + timestamp: float + node_name: Optional[str] = None + node_type: Optional[str] = None + execution_time: Optional[float] = None + error: Optional[str] = None + router_choice: Optional[str] = None + metadata: Optional[dict] = None + + +def default_event_callback(event: AriumEvent) -> None: + """ + Default callback function that prints workflow events to console with formatting. + + This provides useful output for debugging and monitoring workflow execution + without requiring any custom callback setup. + + Args: + event: The AriumEvent to process and display + """ + timestamp = time.strftime('%H:%M:%S', time.localtime(event.timestamp)) + + if event.event_type == AriumEventType.WORKFLOW_STARTED: + print(f'🚀 [{timestamp}] Workflow started') + + elif event.event_type == AriumEventType.WORKFLOW_COMPLETED: + print(f'✅ [{timestamp}] Workflow completed') + + elif event.event_type == AriumEventType.WORKFLOW_FAILED: + print(f'❌ [{timestamp}] Workflow failed: {event.error}') + + elif event.event_type == AriumEventType.NODE_STARTED: + node_desc = ( + f'{event.node_type}: {event.node_name}' + if event.node_type + else event.node_name + ) + print(f'⚡ [{timestamp}] Started {node_desc}') + + elif event.event_type == AriumEventType.NODE_COMPLETED: + duration = f' ({event.execution_time:.2f}s)' if event.execution_time else '' + print(f'✅ [{timestamp}] Completed {event.node_name}{duration}') + + elif event.event_type == AriumEventType.NODE_FAILED: + print(f'❌ [{timestamp}] Failed {event.node_name}: {event.error}') + + elif event.event_type == AriumEventType.ROUTER_DECISION: + print(f'🔀 [{timestamp}] Router chose: {event.router_choice}') + + elif event.event_type == AriumEventType.EDGE_TRAVERSED: + print(f'➡️ [{timestamp}] Moving from {event.node_name} to next node') From bb5d49a3e83da548a30a46eb7575e23944fc9716 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Thu, 11 Sep 2025 11:50:02 +0530 Subject: [PATCH 2/4] using logger instead of print statements in default callback - also exposing event system function and classes from arium __init__ --- flo_ai/flo_ai/arium/__init__.py | 5 +++++ flo_ai/flo_ai/arium/events.py | 17 +++++++++-------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/flo_ai/flo_ai/arium/__init__.py b/flo_ai/flo_ai/arium/__init__.py index 04f27bd5..fa372477 100644 --- a/flo_ai/flo_ai/arium/__init__.py +++ b/flo_ai/flo_ai/arium/__init__.py @@ -3,6 +3,7 @@ from .builder import AriumBuilder, create_arium from .memory import MessageMemory, BaseMemory from .models import StartNode, EndNode, Edge +from .events import AriumEventType, AriumEvent, default_event_callback from .llm_router import ( BaseLLMRouter, SmartRouter, @@ -22,6 +23,10 @@ 'StartNode', 'EndNode', 'Edge', + # Event system + 'AriumEventType', + 'AriumEvent', + 'default_event_callback', # LLM Router functionality 'BaseLLMRouter', 'SmartRouter', diff --git a/flo_ai/flo_ai/arium/events.py b/flo_ai/flo_ai/arium/events.py index 0ed6d972..44dee2d7 100644 --- a/flo_ai/flo_ai/arium/events.py +++ b/flo_ai/flo_ai/arium/events.py @@ -9,6 +9,7 @@ from dataclasses import dataclass from typing import Optional import time +from flo_ai.utils.logger import logger class AriumEventType(Enum): @@ -63,13 +64,13 @@ def default_event_callback(event: AriumEvent) -> None: timestamp = time.strftime('%H:%M:%S', time.localtime(event.timestamp)) if event.event_type == AriumEventType.WORKFLOW_STARTED: - print(f'🚀 [{timestamp}] Workflow started') + logger.info(f'🚀 [{timestamp}] Workflow started') elif event.event_type == AriumEventType.WORKFLOW_COMPLETED: - print(f'✅ [{timestamp}] Workflow completed') + logger.info(f'✅ [{timestamp}] Workflow completed') elif event.event_type == AriumEventType.WORKFLOW_FAILED: - print(f'❌ [{timestamp}] Workflow failed: {event.error}') + logger.error(f'❌ [{timestamp}] Workflow failed: {event.error}') elif event.event_type == AriumEventType.NODE_STARTED: node_desc = ( @@ -77,17 +78,17 @@ def default_event_callback(event: AriumEvent) -> None: if event.node_type else event.node_name ) - print(f'⚡ [{timestamp}] Started {node_desc}') + logger.info(f'⚡ [{timestamp}] Started {node_desc}') elif event.event_type == AriumEventType.NODE_COMPLETED: duration = f' ({event.execution_time:.2f}s)' if event.execution_time else '' - print(f'✅ [{timestamp}] Completed {event.node_name}{duration}') + logger.info(f'✅ [{timestamp}] Completed {event.node_name}{duration}') elif event.event_type == AriumEventType.NODE_FAILED: - print(f'❌ [{timestamp}] Failed {event.node_name}: {event.error}') + logger.error(f'❌ [{timestamp}] Failed {event.node_name}: {event.error}') elif event.event_type == AriumEventType.ROUTER_DECISION: - print(f'🔀 [{timestamp}] Router chose: {event.router_choice}') + logger.info(f'🔀 [{timestamp}] Router chose: {event.router_choice}') elif event.event_type == AriumEventType.EDGE_TRAVERSED: - print(f'➡️ [{timestamp}] Moving from {event.node_name} to next node') + logger.info(f'➡️ [{timestamp}] Moving from {event.node_name} to next node') From 3aea1155720b533de4603a025e58edc2bf548b04 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Thu, 11 Sep 2025 12:00:07 +0530 Subject: [PATCH 3/4] renamed filtered_events to events_filter --- flo_ai/flo_ai/arium/arium.py | 44 +++++++++++++++++------------------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/flo_ai/flo_ai/arium/arium.py b/flo_ai/flo_ai/arium/arium.py index 92bfdff8..ecde45fa 100644 --- a/flo_ai/flo_ai/arium/arium.py +++ b/flo_ai/flo_ai/arium/arium.py @@ -32,7 +32,7 @@ async def run( inputs: List[str | ImageMessage], variables: Optional[Dict[str, Any]] = None, event_callback: Optional[Callable[[AriumEvent], None]] = None, - filtered_events: Optional[List[AriumEventType]] = None, + events_filter: Optional[List[AriumEventType]] = None, ): """ Execute the Arium workflow with optional event monitoring. @@ -41,7 +41,7 @@ async def run( inputs: Input messages for the workflow variables: Variable substitutions for templated prompts event_callback: Function to call for each event (if None, no events are emitted) - filtered_events: List of event types to listen for (defaults to all) + events_filter: List of event types to listen for (defaults to all) Returns: List of workflow execution results @@ -55,14 +55,12 @@ async def run( if not self.nodes: raise ValueError('Arium has no nodes') - # Set default filtered events to all event types if not specified - if filtered_events is None: - filtered_events = list(AriumEventType) + # Set default event filters to all event types if not specified + if events_filter is None: + events_filter = list(AriumEventType) # Emit workflow started event - self._emit_event( - AriumEventType.WORKFLOW_STARTED, event_callback, filtered_events - ) + self._emit_event(AriumEventType.WORKFLOW_STARTED, event_callback, events_filter) try: # Extract and validate variables from inputs and all agents @@ -74,12 +72,12 @@ async def run( # Execute the workflow with event support result = await self._execute_graph( - resolved_inputs, event_callback, filtered_events + resolved_inputs, event_callback, events_filter ) # Emit workflow completed event self._emit_event( - AriumEventType.WORKFLOW_COMPLETED, event_callback, filtered_events + AriumEventType.WORKFLOW_COMPLETED, event_callback, events_filter ) return result @@ -89,7 +87,7 @@ async def run( self._emit_event( AriumEventType.WORKFLOW_FAILED, event_callback, - filtered_events, + events_filter, error=str(e), ) raise @@ -98,7 +96,7 @@ def _emit_event( self, event_type: AriumEventType, callback: Optional[Callable[[AriumEvent], None]], - filtered_events: Optional[List[AriumEventType]], + events_filter: Optional[List[AriumEventType]], **kwargs, ) -> None: """ @@ -107,10 +105,10 @@ def _emit_event( Args: event_type: The type of event to emit callback: Function to call with the event (if None, no event is emitted) - filtered_events: List of event types to listen for + events_filter: List of event types to listen for **kwargs: Additional event data (node_name, error, etc.) """ - if callback and event_type in filtered_events: + if callback and event_type in events_filter: event = AriumEvent(event_type=event_type, timestamp=time.time(), **kwargs) callback(event) @@ -118,7 +116,7 @@ async def _execute_graph( self, inputs: List[str | ImageMessage], event_callback: Optional[Callable[[AriumEvent], None]] = None, - filtered_events: Optional[List[AriumEventType]] = None, + events_filter: Optional[List[AriumEventType]] = None, ): [self.memory.add(msg) for msg in inputs] @@ -163,7 +161,7 @@ async def _execute_graph( ) # execute current node result = await self._execute_node( - current_node, event_callback, filtered_events + current_node, event_callback, events_filter ) # update results to memory @@ -197,7 +195,7 @@ async def _execute_graph( self._emit_event( AriumEventType.ROUTER_DECISION, event_callback, - filtered_events, + events_filter, node_name=current_node.name, router_choice=next_node_name, ) @@ -206,7 +204,7 @@ async def _execute_graph( self._emit_event( AriumEventType.EDGE_TRAVERSED, event_callback, - filtered_events, + events_filter, node_name=current_node.name, ) @@ -297,7 +295,7 @@ async def _execute_node( self, node: Agent | Tool | StartNode | EndNode, event_callback: Optional[Callable[[AriumEvent], None]] = None, - filtered_events: Optional[List[AriumEventType]] = None, + events_filter: Optional[List[AriumEventType]] = None, ): """ Execute a single node with optional event emission. @@ -305,7 +303,7 @@ async def _execute_node( Args: node: The node to execute event_callback: Function to call for events (if None, no events are emitted) - filtered_events: List of event types to listen for + events_filter: List of event types to listen for Returns: The result of node execution @@ -326,7 +324,7 @@ async def _execute_node( self._emit_event( AriumEventType.NODE_STARTED, event_callback, - filtered_events, + events_filter, node_name=node.name, node_type=node_type, ) @@ -354,7 +352,7 @@ async def _execute_node( self._emit_event( AriumEventType.NODE_COMPLETED, event_callback, - filtered_events, + events_filter, node_name=node.name, node_type=node_type, execution_time=execution_time, @@ -370,7 +368,7 @@ async def _execute_node( self._emit_event( AriumEventType.NODE_FAILED, event_callback, - filtered_events, + events_filter, node_name=node.name, node_type=node_type, execution_time=execution_time, From d45ee31679127f0a74dad1e54b66ed1f7750e3c9 Mon Sep 17 00:00:00 2001 From: rootflo-hardik Date: Thu, 11 Sep 2025 12:13:35 +0530 Subject: [PATCH 4/4] event_listener in readme --- flo_ai/flo_ai/arium/README.md | 151 +++++++++++++++++++++++++++++++++- 1 file changed, 148 insertions(+), 3 deletions(-) diff --git a/flo_ai/flo_ai/arium/README.md b/flo_ai/flo_ai/arium/README.md index 6de5fa3e..e2c3f3dd 100644 --- a/flo_ai/flo_ai/arium/README.md +++ b/flo_ai/flo_ai/arium/README.md @@ -25,9 +25,154 @@ result = await (AriumBuilder() - **Easy Connections**: Simple `connect()` method for linear workflows - **Flexible Routing**: Full support for custom router functions - **🧠 LLM-Powered Routing**: Intelligent routing using Large Language Models +- **📊 Event Monitoring**: Real-time workflow execution monitoring with customizable callbacks - **Visualization**: Built-in graph visualization support - **Reusable Workflows**: Build once, run multiple times +## 📊 Event Monitoring + +Arium provides comprehensive event monitoring capabilities that allow you to track workflow execution in real-time. This is invaluable for debugging, performance monitoring, and understanding how your workflows behave. + +### Key Features + +- **Real-time Events**: Monitor workflow lifecycle, node execution, and routing decisions +- **Custom Callbacks**: Write your own event handlers for logging, metrics, or debugging +- **Event Filtering**: Choose which types of events to monitor +- **Zero Configuration**: Works out of the box with sensible defaults +- **Performance Tracking**: Automatic execution time measurement for nodes + +### Available Event Types + +| Event Type | Description | +|------------|-------------| +| `WORKFLOW_STARTED` | Fired when workflow execution begins | +| `WORKFLOW_COMPLETED` | Fired when workflow completes successfully | +| `WORKFLOW_FAILED` | Fired when workflow fails with an error | +| `NODE_STARTED` | Fired when a node (agent/tool) begins execution | +| `NODE_COMPLETED` | Fired when a node completes successfully | +| `NODE_FAILED` | Fired when a node fails with an error | +| `ROUTER_DECISION` | Fired when a router chooses the next node | +| `EDGE_TRAVERSED` | Fired when moving from one node to another | + +### Basic Usage + +```python +from flo_ai.arium import AriumBuilder, AriumEventType, default_event_callback + +# Enable event monitoring with default callback (logs to console) +arium = (AriumBuilder() + .add_agent(my_agent) + .add_tool(my_tool) + .start_with(my_agent) + .connect(my_agent, my_tool) + .end_with(my_tool) + .build()) + +result = await arium.run( + inputs=["Process this"], + event_callback=default_event_callback +) +``` + +### Custom Event Callbacks + +```python +def my_event_handler(event): + """Custom event handler for specialized logging""" + print(f"🔔 {event.event_type.value}: {event.node_name}") + if event.execution_time: + print(f" ⏱️ Took {event.execution_time:.2f}s") + if event.error: + print(f" ❌ Error: {event.error}") + +# Use your custom callback +arium = (AriumBuilder() + .add_agent(my_agent) + .start_with(my_agent) + .end_with(my_agent) + .build()) + +result = await arium.run( + inputs=["Hello world"], + event_callback=my_event_handler +) +``` + +### Event Filtering + +Monitor only specific types of events by providing an `events_filter`: + +```python +from flo_ai.arium import AriumEventType, default_event_callback + +# Only monitor workflow lifecycle and node completions +important_events = [ + AriumEventType.WORKFLOW_STARTED, + AriumEventType.NODE_COMPLETED, + AriumEventType.WORKFLOW_COMPLETED, + AriumEventType.WORKFLOW_FAILED +] + +arium = (AriumBuilder() + .add_agent(my_agent) + .start_with(my_agent) + .end_with(my_agent) + .build()) + +result = await arium.run( + inputs=["Process this"], + event_callback=default_event_callback, + events_filter=important_events +) +``` + +### Silent Execution + +By default, workflows run silently. You can use either approach: + +```python +# Option 1: Use build_and_run() for convenience (no events) +result = await (AriumBuilder() + .add_agent(my_agent) + .start_with(my_agent) + .end_with(my_agent) + .build_and_run(["Silent execution"])) + +# Option 2: Use build() then run() (no events) +arium = (AriumBuilder() + .add_agent(my_agent) + .start_with(my_agent) + .end_with(my_agent) + .build()) + +result = await arium.run(["Silent execution"]) +``` + +### Event Monitoring vs Build-and-Run + +**Important**: Event monitoring is only available through the `Arium.run()` method. The AriumBuilder's `build_and_run()` convenience method does not support event parameters. + +- **For event monitoring**: Use `.build()` then `arium.run(inputs, event_callback=...)` +- **For simple execution**: Use `.build_and_run(inputs)` for convenience +- **For reusable workflows**: Use `.build()` once, then call `arium.run()` multiple times + +### Event Data Structure + +Each event is an `AriumEvent` object with the following properties: + +```python +@dataclass +class AriumEvent: + event_type: AriumEventType # Type of event + timestamp: float # Unix timestamp + node_name: Optional[str] = None # Name of involved node + node_type: Optional[str] = None # Type: 'agent', 'tool', 'start', 'end' + execution_time: Optional[float] = None # Node execution time in seconds + error: Optional[str] = None # Error message if applicable + router_choice: Optional[str] = None # Node chosen by router + metadata: Optional[dict] = None # Additional event data +``` + ## API Reference ### AriumBuilder Methods @@ -44,7 +189,7 @@ result = await (AriumBuilder() | `connect(from_node, to_node)` | Simple connection between nodes | | `add_edge(from_node, to_nodes, router)` | Add edge with optional router | | `build()` | Build the Arium instance | -| `build_and_run(inputs)` | Build and run in one step | +| `build_and_run(inputs, variables=None)` | Build and run in one step (no event monitoring support) | | `visualize(output_path, title)` | Generate workflow visualization | | `reset()` | Reset builder to start fresh | @@ -98,8 +243,8 @@ arium = (AriumBuilder() .end_with(my_agent) .build()) -# Run multiple times -result1 = await arium.run(["Input 1"]) +# Run multiple times (with optional event monitoring) +result1 = await arium.run(["Input 1"], event_callback=default_event_callback) result2 = await arium.run(["Input 2"]) ```