Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
435 changes: 3 additions & 432 deletions agent_core/core/database_interface.py

Large diffs are not rendered by default.

63 changes: 0 additions & 63 deletions agent_core/core/impl/action/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,6 @@ async def execute_action(
if not parent_id and self._get_parent_id:
parent_id = self._get_parent_id()

# Persist RUNNING status using fast append-only logging
await self.db_interface.log_action_start_async(
run_id=run_id,
session_id=session_id,
parent_id=parent_id,
name=action.name,
action_type=action.action_type,
inputs=input_data,
started_at=started_at,
)

# Call on_action_start hook if provided
if self._on_action_start:
try:
Expand Down Expand Up @@ -330,14 +319,6 @@ async def execute_action(
state.get_agent_property("action_count", 0) + 1
)

# Persist final status using fast append-only logging
await self.db_interface.log_action_end_async(
run_id=run_id,
outputs=outputs,
status=status,
ended_at=ended_at,
)

# Call on_action_end hook if provided
if self._on_action_end:
try:
Expand Down Expand Up @@ -456,34 +437,6 @@ async def execute_single(action: Action, input_data: Dict, action_session_id: st
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------

def _log_action_history(
self,
*,
run_id: str,
action: Action,
inputs: Optional[Dict],
outputs: Optional[Dict],
status: str,
started_at: Optional[str],
ended_at: Optional[str],
parent_id: Optional[str],
session_id: Optional[str],
) -> None:
"""Upsert a single history document keyed by *runId*."""
self.db_interface.upsert_action_history(
run_id,
session_id=session_id,
parent_id=parent_id,
name=action.name,
action_type=action.action_type,
status=status,
inputs=inputs,
outputs=outputs,
started_at=started_at,
ended_at=ended_at,
)

def _log_event_stream(
self,
is_gui_task: bool,
Expand Down Expand Up @@ -638,19 +591,3 @@ async def run_observe_step(self, action: Action, action_output: Dict) -> Dict[st
attempt += 1

return {"success": False, "message": "Observation failed or timed out."}

# ------------------------------------------------------------------
# Helper
# ------------------------------------------------------------------

def get_action_history(self, limit: int = 10) -> List[Dict[str, Any]]:
"""
Retrieve recent action history entries.

Args:
limit: Maximum number of history documents to return.

Returns:
List[Dict[str, Any]]: Collection of run metadata.
"""
return self.db_interface.get_action_history(limit)
89 changes: 75 additions & 14 deletions agent_core/core/impl/context/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,14 +538,82 @@ def get_user_info(self) -> str:
"""Get current user info for user prompts (WCA-specific via hook)."""
return self._get_user_info()

def _build_memory_query(self, query: Optional[str], session_id: Optional[str]) -> Optional[str]:
"""Build a semantic query for memory retrieval.

Combines task instruction with recent conversation messages (both user
and agent) to provide better context for memory search.

Args:
query: Optional explicit query string.
session_id: Optional session ID for session-specific state lookup.

Returns:
A query string suitable for semantic memory search, or None if no context.
"""
# Get task instruction as the base query
session = get_session_or_none(session_id)
if session and session.current_task:
task_instruction = session.current_task.instruction
else:
current_task = get_state().current_task
task_instruction = current_task.instruction if current_task else None

if not task_instruction:
# Fall back to explicit query if no task
return query if query else None

# Get recent conversation messages for additional context
recent_context = self._get_recent_conversation_for_memory(session_id, limit=5)

if recent_context:
return f"{task_instruction}\n\nRecent conversation:\n{recent_context}"
else:
return task_instruction

def _get_recent_conversation_for_memory(self, session_id: Optional[str], limit: int = 5) -> str:
"""Get recent conversation messages for memory query context.

Args:
session_id: Optional session ID for session-specific event stream.
limit: Maximum number of messages to include.

Returns:
Formatted string of recent user and agent messages.
"""
try:
event_stream_manager = self.state_manager.event_stream_manager
if not event_stream_manager:
return ""

# Get messages from conversation history (includes both user and agent)
recent_messages = event_stream_manager.get_recent_conversation_messages(limit)
if not recent_messages:
return ""

# Format messages simply for semantic search
lines = []
for event in recent_messages:
# Simplify the kind label for the query
if "user message" in event.kind:
lines.append(f"User: {event.message}")
elif "agent message" in event.kind:
lines.append(f"Agent: {event.message}")

return "\n".join(lines)

except Exception as e:
logger.warning(f"[MEMORY] Failed to get recent conversation: {e}")
return ""

def get_memory_context(
self, query: Optional[str] = None, top_k: int = 5, session_id: Optional[str] = None
) -> str:
"""Get relevant memories for inclusion in prompts.

Args:
query: Optional query string for memory retrieval. If not provided,
uses current task instruction.
uses current task instruction combined with recent conversation.
top_k: Number of top memories to retrieve.
session_id: Optional session ID for session-specific state lookup.
"""
Expand All @@ -556,21 +624,14 @@ def get_memory_context(
if not _is_memory_enabled():
return ""

if not query:
# Try session-specific state first
session = get_session_or_none(session_id)
if session and session.current_task:
current_task = session.current_task
else:
current_task = get_state().current_task

if current_task:
query = current_task.instruction
else:
return ""
# Build semantic query from task instruction + recent conversation
# This provides better context than using the raw trigger description
memory_query = self._build_memory_query(query, session_id)
if not memory_query:
return ""

try:
pointers = self._memory_manager.retrieve(query, top_k=top_k, min_relevance=0.3)
pointers = self._memory_manager.retrieve(memory_query, top_k=top_k, min_relevance=0.3)

if not pointers:
return ""
Expand Down
14 changes: 13 additions & 1 deletion agent_core/core/impl/mcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,19 @@ async def call_tool(
"message": f"MCP server '{server_name}' connection lost",
}

return await server.call_tool(tool_name, arguments)
result = await server.call_tool(tool_name, arguments)

# Record MCP tool call for metrics (only if not an error)
if result.get("status") != "error":
try:
from app.ui_layer.metrics.collector import MetricsCollector
collector = MetricsCollector.get_instance()
if collector:
collector.record_mcp_tool_call(tool_name, server_name)
except Exception:
pass # Don't fail tool execution if metrics recording fails

return result

async def refresh_tools(self, server_name: Optional[str] = None) -> None:
"""
Expand Down
2 changes: 2 additions & 0 deletions agent_core/core/impl/mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ async def connect(self) -> bool:
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=full_env,
limit=10 * 1024 * 1024, # 10MB limit for large MCP responses (e.g., screenshots)
)
else:
self._process = await asyncio.create_subprocess_exec(
Expand All @@ -165,6 +166,7 @@ async def connect(self) -> bool:
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=full_env,
limit=10 * 1024 * 1024, # 10MB limit for large MCP responses (e.g., screenshots)
)
except FileNotFoundError as e:
logger.error(f"[StdioTransport] Command not found: '{command}'. Make sure it is installed and in PATH. Error: {e}")
Expand Down
3 changes: 0 additions & 3 deletions agent_core/core/impl/task/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,6 @@ def create_task(

self.tasks[task_id] = task
self._current_session_id = task_id # CraftBot compatibility
self.db_interface.log_task(task)
self._sync_state_manager(task)

# Notify state manager for two-tier state tracking
Expand Down Expand Up @@ -400,7 +399,6 @@ def _clean_content(s: str) -> str:
transitions.append((item, "pending", "in_progress"))

self.active.todos = new_todos
self.db_interface.log_task(self.active)
self._sync_state_manager(self.active)

# Report transitions via hook if provided (WCA)
Expand Down Expand Up @@ -585,7 +583,6 @@ async def _end_task(
task.final_summary = summary
task.errors = errors or []

self.db_interface.log_task(task)
self._sync_state_manager(task)

self.event_stream_manager.log(
Expand Down
33 changes: 11 additions & 22 deletions agent_core/core/impl/trigger/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,9 +465,8 @@ async def fire(
if t.session_id == session_id:
t.fire_at = time.time()
if message:
t.next_action_description += (
f"\n\n[NEW USER MESSAGE]: {message}"
)
# Store in payload instead of polluting the description
t.payload["pending_user_message"] = message
if platform:
t.payload["pending_platform"] = platform
found = True
Expand All @@ -481,9 +480,8 @@ async def fire(
if session_id in self._active:
t = self._active[session_id]
if message:
t.next_action_description += (
f"\n\n[NEW USER MESSAGE]: {message}"
)
# Store in payload instead of polluting the description
t.payload["pending_user_message"] = message
if platform:
t.payload["pending_platform"] = platform
logger.debug(f"[FIRE] Attached message to active trigger for session {session_id}")
Expand Down Expand Up @@ -528,9 +526,9 @@ def pop_pending_user_message(self, session_id: str) -> tuple[str | None, str | N
"""
Extract and remove any pending user message from an active trigger.

When fire() attaches a message to an active trigger via
'[NEW USER MESSAGE]: ...', this method extracts that message
so it can be carried forward to the next trigger.
When fire() attaches a message to an active trigger's payload,
this method extracts that message so it can be carried forward
to the next trigger.

Args:
session_id: The session to check for pending messages.
Expand All @@ -542,23 +540,14 @@ def pop_pending_user_message(self, session_id: str) -> tuple[str | None, str | N
return None, None

trigger = self._active[session_id]
marker = "\n\n[NEW USER MESSAGE]:"
desc = trigger.next_action_description

if marker not in desc:
return None, None

# Extract the message
idx = desc.index(marker)
message = desc[idx + len(marker):].strip()

# Extract and remove the platform from payload
# Extract and remove the message from payload
message = trigger.payload.pop("pending_user_message", None)
platform = trigger.payload.pop("pending_platform", None)

# Remove the message from the trigger to avoid duplication
trigger.next_action_description = desc[:idx]
if message:
logger.debug(f"[TRIGGER] Extracted pending user message for session {session_id}: {message[:50]}...")

logger.debug(f"[TRIGGER] Extracted pending user message for session {session_id}: {message[:50]}...")
return message, platform

# =================================================================
Expand Down
12 changes: 0 additions & 12 deletions agent_core/core/protocols/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,3 @@ async def execute_action(
Result dictionary with outputs and status.
"""
...

def get_action_history(self, limit: int = 10) -> list:
"""
Get recent action history.

Args:
limit: Maximum number of entries to return.

Returns:
List of action history entries.
"""
...
Loading