diff --git a/agent_core/core/database_interface.py b/agent_core/core/database_interface.py index 69b676a5..98653968 100644 --- a/agent_core/core/database_interface.py +++ b/agent_core/core/database_interface.py @@ -7,17 +7,14 @@ from __future__ import annotations -import asyncio import datetime import json import re -from dataclasses import asdict from pathlib import Path -from typing import Any, Dict, Iterable, List, Optional +from typing import Any, Dict, List, Optional from agent_core.utils.logger import logger -from agent_core.core.task.task import Task from agent_core.core.action_framework.registry import registry_instance from agent_core.core.action_framework.loader import load_actions_from_directories @@ -30,32 +27,27 @@ def __init__( *, data_dir: str = "app/data", chroma_path: str = "./chroma_db", - log_file: Optional[str] = None, ) -> None: """ Initialize storage directories for agent data. - The constructor sets up filesystem paths for logs, actions, task + The constructor sets up filesystem paths for actions, task documents, and agent info. Actions are loaded from directories into the in-memory registry. Args: - data_dir: Base directory used to persist logs and JSON artifacts. + data_dir: Base directory used to persist JSON artifacts. chroma_path: Unused (kept for backward compatibility). - log_file: Optional explicit log file path; defaults to - ``/agent_logs.txt`` when omitted. """ self.data_dir = Path(data_dir) self.data_dir.mkdir(parents=True, exist_ok=True) - self.log_file_path = Path(log_file) if log_file else self.data_dir / "agent_logs.txt" self.actions_dir = self.data_dir / "action" self.task_docs_dir = self.data_dir / "task_document" self.agent_info_path = self.data_dir / "agent_info.json" self.actions_dir.mkdir(parents=True, exist_ok=True) self.task_docs_dir.mkdir(parents=True, exist_ok=True) - self.log_file_path.touch(exist_ok=True) if not self.agent_info_path.exists(): self.agent_info_path.write_text("{}", encoding="utf-8") @@ -67,374 +59,6 @@ def __init__( action_names = [a.get("name") for a in actions if a.get("name")] logger.info(f"Action registry loaded. {len(action_names)} actions available: [{', '.join(sorted(action_names))}]") - # ------------------------------------------------------------------ - # Log helpers - # ------------------------------------------------------------------ - def _load_log_entries(self) -> List[Dict[str, Any]]: - entries: List[Dict[str, Any]] = [] - try: - with self.log_file_path.open("r", encoding="utf-8") as handle: - for line in handle: - line = line.strip() - if not line: - continue - try: - entries.append(json.loads(line)) - except json.JSONDecodeError: - logger.warning(f"[LOG PARSE] Skipping malformed line in {self.log_file_path}") - except FileNotFoundError: - pass - return entries - - def _write_log_entries(self, entries: Iterable[Dict[str, Any]]) -> None: - with self.log_file_path.open("w", encoding="utf-8") as handle: - for entry in entries: - handle.write(json.dumps(entry, default=str) + "\n") - - def _append_log_entry(self, entry: Dict[str, Any]) -> None: - with self.log_file_path.open("a", encoding="utf-8") as handle: - handle.write(json.dumps(entry, default=str) + "\n") - - # ------------------------------------------------------------------ - # Prompt logging & token usage helpers - # ------------------------------------------------------------------ - def log_prompt( - self, - *, - input_data: Dict[str, str], - output: Optional[str], - provider: str, - model: str, - config: Dict[str, Any], - status: str, - token_count_input: Optional[int] = None, - token_count_output: Optional[int] = None, - ) -> None: - """ - Store a single prompt interaction with metadata and token counts. - - Each call appends a structured record to the log file so usage metrics - and model behavior can be inspected later. - - Args: - input_data: Serialized prompt inputs sent to the model provider. - output: The raw model output string, if available. - provider: Name of the LLM provider (e.g., OpenAI, Anthropic). - model: Specific model identifier used for the request. - config: Provider-specific configuration details for the call. - status: Execution status for the prompt (e.g., ``"success"`` or - ``"error"``). - token_count_input: Optional token count for the prompt payload. - token_count_output: Optional token count for the model response. - """ - entry = { - "entry_type": "prompt_log", - "datetime": datetime.datetime.utcnow().isoformat(), - "input": input_data, - "output": output, - "provider": provider, - "model": model, - "config": config, - "status": status, - "token_count_input": token_count_input, - "token_count_output": token_count_output, - } - self._append_log_entry(entry) - - def _iter_prompt_logs(self) -> Iterable[Dict[str, Any]]: - for entry in self._load_log_entries(): - if entry.get("entry_type") == "prompt_log": - yield entry - - # ------------------------------------------------------------------ - # Action history logging - # ------------------------------------------------------------------ - def upsert_action_history( - self, - run_id: str, - *, - session_id: str, - parent_id: str | None, - name: str, - action_type: str, - status: str, - inputs: Dict[str, Any] | None, - outputs: Dict[str, Any] | None, - started_at: str | None, - ended_at: str | None, - ) -> None: - """ - Insert or update an action execution history entry. - - The log is keyed by ``run_id``; repeated writes merge new details while - preserving the initial ``startedAt`` value when absent. - - Args: - run_id: Unique identifier for the action execution instance. - session_id: Identifier for the session that triggered the action. - parent_id: Optional run identifier for the parent action in a tree. - name: Human-readable action name. - action_type: Action type label; duplicated into ``type`` for - backward compatibility. - status: Current execution status. - inputs: Serialized action inputs, if available. - outputs: Serialized action outputs, if available. - started_at: ISO timestamp for when execution began. - ended_at: ISO timestamp for when execution completed. - """ - entries = self._load_log_entries() - payload = { - "entry_type": "action_history", - "runId": run_id, - "sessionId": session_id, - "parentId": parent_id, - "name": name, - "action_type": action_type, - "type": action_type, - "status": status, - "inputs": inputs, - "outputs": outputs, - "startedAt": started_at, - "endedAt": ended_at, - } - - found = False - for entry in entries: - if entry.get("entry_type") == "action_history" and entry.get("runId") == run_id: - entry["action_type"] = payload["action_type"] - entry["type"] = payload["type"] - entry.update({k: v for k, v in payload.items() if v is not None or k in {"inputs", "outputs"}}) - if entry.get("startedAt") is None: - entry["startedAt"] = started_at - found = True - break - - if not found: - if payload["startedAt"] is None: - payload["startedAt"] = datetime.datetime.utcnow().isoformat() - entries.append(payload) - - self._write_log_entries(entries) - - # ------------------------------------------------------------------ - # Fast append-only action logging (for parallel execution) - # ------------------------------------------------------------------ - def log_action_start( - self, - run_id: str, - *, - session_id: str | None, - parent_id: str | None, - name: str, - action_type: str, - inputs: Dict[str, Any] | None, - started_at: str, - ) -> None: - """ - Fast O(1) append for action start - no file read/rewrite. - - This method only appends to the log file, avoiding the O(n) read/search/write - pattern of upsert_action_history. Use this for parallel action execution. - - Args: - run_id: Unique identifier for the action execution instance. - session_id: Identifier for the session that triggered the action. - parent_id: Optional run identifier for the parent action. - name: Human-readable action name. - action_type: Action type label. - inputs: Serialized action inputs. - started_at: ISO timestamp for when execution began. - """ - entry = { - "entry_type": "action_history", - "runId": run_id, - "sessionId": session_id, - "parentId": parent_id, - "name": name, - "action_type": action_type, - "type": action_type, - "status": "running", - "inputs": inputs, - "outputs": None, - "startedAt": started_at, - "endedAt": None, - } - self._append_log_entry(entry) - - def log_action_end( - self, - run_id: str, - *, - outputs: Dict[str, Any] | None, - status: str, - ended_at: str, - ) -> None: - """ - Fast O(1) append for action end - separate entry, no file rewrite. - - This method appends a completion record rather than updating the original - start entry. The get_action_history method merges these records. - - Args: - run_id: Unique identifier for the action execution instance. - outputs: Serialized action outputs. - status: Final execution status (success/error). - ended_at: ISO timestamp for when execution completed. - """ - entry = { - "entry_type": "action_end", - "runId": run_id, - "status": status, - "outputs": outputs, - "endedAt": ended_at, - } - self._append_log_entry(entry) - - async def log_action_start_async( - self, - run_id: str, - *, - session_id: str | None, - parent_id: str | None, - name: str, - action_type: str, - inputs: Dict[str, Any] | None, - started_at: str, - ) -> None: - """Async wrapper for log_action_start - runs file I/O in thread pool.""" - await asyncio.to_thread( - self.log_action_start, - run_id, - session_id=session_id, - parent_id=parent_id, - name=name, - action_type=action_type, - inputs=inputs, - started_at=started_at, - ) - - async def log_action_end_async( - self, - run_id: str, - *, - outputs: Dict[str, Any] | None, - status: str, - ended_at: str, - ) -> None: - """Async wrapper for log_action_end - runs file I/O in thread pool.""" - await asyncio.to_thread( - self.log_action_end, - run_id, - outputs=outputs, - status=status, - ended_at=ended_at, - ) - - def _iter_action_history(self) -> Iterable[Dict[str, Any]]: - for entry in self._load_log_entries(): - if entry.get("entry_type") == "action_history": - yield entry - - def find_actions_by_status(self, status: str) -> List[Dict[str, Any]]: - """ - Return all action history entries matching the given status. - - Args: - status: Status value to filter (e.g., ``"current"`` or ``"pending"``). - - Returns: - List of action history dictionaries where ``status`` matches. - """ - return [entry for entry in self._iter_action_history() if entry.get("status") == status] - - def get_action_history(self, limit: int = 10) -> List[Dict[str, Any]]: - """ - Retrieve recent action history entries ordered by start time. - - This method merges action_history (start) entries with action_end entries - to reconstruct complete action records. This supports the append-only - logging pattern used for parallel execution. - - Args: - limit: Maximum number of entries to return, sorted newest-first. - - Returns: - A list of action history dictionaries truncated to ``limit`` - entries. - """ - starts: Dict[str, Dict[str, Any]] = {} - ends: Dict[str, Dict[str, Any]] = {} - - # Collect start and end entries - for entry in self._load_log_entries(): - entry_type = entry.get("entry_type") - run_id = entry.get("runId") - if not run_id: - continue - - if entry_type == "action_history": - # For duplicate starts (shouldn't happen), keep the latest - starts[run_id] = entry - elif entry_type == "action_end": - ends[run_id] = entry - - # Merge start + end into complete records - history: List[Dict[str, Any]] = [] - for run_id, start in starts.items(): - if run_id in ends: - # Merge end data into start entry - end = ends[run_id] - start["status"] = end.get("status", start.get("status")) - start["outputs"] = end.get("outputs", start.get("outputs")) - start["endedAt"] = end.get("endedAt", start.get("endedAt")) - history.append(start) - - history.sort( - key=lambda e: datetime.datetime.fromisoformat(e.get("startedAt") or datetime.datetime.min.isoformat()), - reverse=True, - ) - return history[:limit] - - # ------------------------------------------------------------------ - # Task logging helpers - # ------------------------------------------------------------------ - def log_task(self, task: Task) -> None: - """ - Persist or update a task log entry for tracking execution progress. - - The task is serialized to JSON-compatible primitives and either - appended to the log or merged with an existing entry for the same task - identifier. - - Args: - task: The :class:`~core.task.task.Task` instance to record. - """ - doc = { - "entry_type": "task_log", - "task_id": task.id, - "name": task.name, - "instruction": task.instruction, - "todos": [asdict(todo) for todo in task.todos], - "created_at": task.created_at, - "status": task.status, - "updated_at": datetime.datetime.utcnow().isoformat(), - } - - entries = self._load_log_entries() - for entry in entries: - if entry.get("entry_type") == "task_log" and entry.get("task_id") == task.id: - entry.update(doc) - break - else: - entries.append(doc) - - self._write_log_entries(entries) - - def _iter_task_logs(self) -> Iterable[Dict[str, Any]]: - for entry in self._load_log_entries(): - if entry.get("entry_type") == "task_log": - yield entry - # ------------------------------------------------------------------ # Action definitions (filesystem + Chroma) # ------------------------------------------------------------------ @@ -594,56 +218,3 @@ def _load_task_documents_from_disk(self) -> List[Dict[str, Any]]: } ) return docs - - # ------------------------------------------------------------------ - # Task helpers (for recovery) - # ------------------------------------------------------------------ - def find_current_task_steps(self) -> List[Dict[str, Any]]: - """ - List steps across all tasks that are marked as ``current``. - - Returns: - A list of dictionaries pairing ``task_id`` with the active step - metadata. - """ - results: List[Dict[str, Any]] = [] - for entry in self._iter_task_logs(): - task_id = entry.get("task_id") - for step in entry.get("steps", []): - if step.get("status") == "current": - results.append({"task_id": task_id, "step": step}) - return results - - def update_step_status( - self, - task_id: str, - action_id: str, - status: str, - failure_message: Optional[str] = None, - ) -> None: - """ - Update the status of a task step and persist the change. - - Args: - task_id: Identifier for the task owning the step. - action_id: The step's action identifier used to locate it. - status: New status string to assign to the step. - failure_message: Optional failure detail to attach when updating. - """ - entries = self._load_log_entries() - updated = False - for entry in entries: - if entry.get("entry_type") != "task_log" or entry.get("task_id") != task_id: - continue - for step in entry.get("steps", []): - if step.get("action_id") == action_id: - step["status"] = status - if failure_message is not None: - step["failure_message"] = failure_message - updated = True - break - if updated: - entry["updated_at"] = datetime.datetime.utcnow().isoformat() - break - if updated: - self._write_log_entries(entries) diff --git a/agent_core/core/impl/action/manager.py b/agent_core/core/impl/action/manager.py index 54152a26..46645263 100644 --- a/agent_core/core/impl/action/manager.py +++ b/agent_core/core/impl/action/manager.py @@ -180,17 +180,6 @@ async def execute_action( if not parent_id and self._get_parent_id: parent_id = self._get_parent_id() - # Persist RUNNING status using fast append-only logging - await self.db_interface.log_action_start_async( - run_id=run_id, - session_id=session_id, - parent_id=parent_id, - name=action.name, - action_type=action.action_type, - inputs=input_data, - started_at=started_at, - ) - # Call on_action_start hook if provided if self._on_action_start: try: @@ -330,14 +319,6 @@ async def execute_action( state.get_agent_property("action_count", 0) + 1 ) - # Persist final status using fast append-only logging - await self.db_interface.log_action_end_async( - run_id=run_id, - outputs=outputs, - status=status, - ended_at=ended_at, - ) - # Call on_action_end hook if provided if self._on_action_end: try: @@ -456,34 +437,6 @@ async def execute_single(action: Action, input_data: Dict, action_session_id: st # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ - - def _log_action_history( - self, - *, - run_id: str, - action: Action, - inputs: Optional[Dict], - outputs: Optional[Dict], - status: str, - started_at: Optional[str], - ended_at: Optional[str], - parent_id: Optional[str], - session_id: Optional[str], - ) -> None: - """Upsert a single history document keyed by *runId*.""" - self.db_interface.upsert_action_history( - run_id, - session_id=session_id, - parent_id=parent_id, - name=action.name, - action_type=action.action_type, - status=status, - inputs=inputs, - outputs=outputs, - started_at=started_at, - ended_at=ended_at, - ) - def _log_event_stream( self, is_gui_task: bool, @@ -638,19 +591,3 @@ async def run_observe_step(self, action: Action, action_output: Dict) -> Dict[st attempt += 1 return {"success": False, "message": "Observation failed or timed out."} - - # ------------------------------------------------------------------ - # Helper - # ------------------------------------------------------------------ - - def get_action_history(self, limit: int = 10) -> List[Dict[str, Any]]: - """ - Retrieve recent action history entries. - - Args: - limit: Maximum number of history documents to return. - - Returns: - List[Dict[str, Any]]: Collection of run metadata. - """ - return self.db_interface.get_action_history(limit) diff --git a/agent_core/core/impl/context/engine.py b/agent_core/core/impl/context/engine.py index 1582e85a..63a01c05 100644 --- a/agent_core/core/impl/context/engine.py +++ b/agent_core/core/impl/context/engine.py @@ -538,6 +538,74 @@ def get_user_info(self) -> str: """Get current user info for user prompts (WCA-specific via hook).""" return self._get_user_info() + def _build_memory_query(self, query: Optional[str], session_id: Optional[str]) -> Optional[str]: + """Build a semantic query for memory retrieval. + + Combines task instruction with recent conversation messages (both user + and agent) to provide better context for memory search. + + Args: + query: Optional explicit query string. + session_id: Optional session ID for session-specific state lookup. + + Returns: + A query string suitable for semantic memory search, or None if no context. + """ + # Get task instruction as the base query + session = get_session_or_none(session_id) + if session and session.current_task: + task_instruction = session.current_task.instruction + else: + current_task = get_state().current_task + task_instruction = current_task.instruction if current_task else None + + if not task_instruction: + # Fall back to explicit query if no task + return query if query else None + + # Get recent conversation messages for additional context + recent_context = self._get_recent_conversation_for_memory(session_id, limit=5) + + if recent_context: + return f"{task_instruction}\n\nRecent conversation:\n{recent_context}" + else: + return task_instruction + + def _get_recent_conversation_for_memory(self, session_id: Optional[str], limit: int = 5) -> str: + """Get recent conversation messages for memory query context. + + Args: + session_id: Optional session ID for session-specific event stream. + limit: Maximum number of messages to include. + + Returns: + Formatted string of recent user and agent messages. + """ + try: + event_stream_manager = self.state_manager.event_stream_manager + if not event_stream_manager: + return "" + + # Get messages from conversation history (includes both user and agent) + recent_messages = event_stream_manager.get_recent_conversation_messages(limit) + if not recent_messages: + return "" + + # Format messages simply for semantic search + lines = [] + for event in recent_messages: + # Simplify the kind label for the query + if "user message" in event.kind: + lines.append(f"User: {event.message}") + elif "agent message" in event.kind: + lines.append(f"Agent: {event.message}") + + return "\n".join(lines) + + except Exception as e: + logger.warning(f"[MEMORY] Failed to get recent conversation: {e}") + return "" + def get_memory_context( self, query: Optional[str] = None, top_k: int = 5, session_id: Optional[str] = None ) -> str: @@ -545,7 +613,7 @@ def get_memory_context( Args: query: Optional query string for memory retrieval. If not provided, - uses current task instruction. + uses current task instruction combined with recent conversation. top_k: Number of top memories to retrieve. session_id: Optional session ID for session-specific state lookup. """ @@ -556,21 +624,14 @@ def get_memory_context( if not _is_memory_enabled(): return "" - if not query: - # Try session-specific state first - session = get_session_or_none(session_id) - if session and session.current_task: - current_task = session.current_task - else: - current_task = get_state().current_task - - if current_task: - query = current_task.instruction - else: - return "" + # Build semantic query from task instruction + recent conversation + # This provides better context than using the raw trigger description + memory_query = self._build_memory_query(query, session_id) + if not memory_query: + return "" try: - pointers = self._memory_manager.retrieve(query, top_k=top_k, min_relevance=0.3) + pointers = self._memory_manager.retrieve(memory_query, top_k=top_k, min_relevance=0.3) if not pointers: return "" diff --git a/agent_core/core/impl/mcp/client.py b/agent_core/core/impl/mcp/client.py index 1831e194..c580c7cf 100644 --- a/agent_core/core/impl/mcp/client.py +++ b/agent_core/core/impl/mcp/client.py @@ -249,7 +249,19 @@ async def call_tool( "message": f"MCP server '{server_name}' connection lost", } - return await server.call_tool(tool_name, arguments) + result = await server.call_tool(tool_name, arguments) + + # Record MCP tool call for metrics (only if not an error) + if result.get("status") != "error": + try: + from app.ui_layer.metrics.collector import MetricsCollector + collector = MetricsCollector.get_instance() + if collector: + collector.record_mcp_tool_call(tool_name, server_name) + except Exception: + pass # Don't fail tool execution if metrics recording fails + + return result async def refresh_tools(self, server_name: Optional[str] = None) -> None: """ diff --git a/agent_core/core/impl/mcp/server.py b/agent_core/core/impl/mcp/server.py index fe8c9f6f..42ac7fea 100644 --- a/agent_core/core/impl/mcp/server.py +++ b/agent_core/core/impl/mcp/server.py @@ -156,6 +156,7 @@ async def connect(self) -> bool: stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=full_env, + limit=10 * 1024 * 1024, # 10MB limit for large MCP responses (e.g., screenshots) ) else: self._process = await asyncio.create_subprocess_exec( @@ -165,6 +166,7 @@ async def connect(self) -> bool: stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=full_env, + limit=10 * 1024 * 1024, # 10MB limit for large MCP responses (e.g., screenshots) ) except FileNotFoundError as e: logger.error(f"[StdioTransport] Command not found: '{command}'. Make sure it is installed and in PATH. Error: {e}") diff --git a/agent_core/core/impl/task/manager.py b/agent_core/core/impl/task/manager.py index f6dd99a1..89156266 100644 --- a/agent_core/core/impl/task/manager.py +++ b/agent_core/core/impl/task/manager.py @@ -271,7 +271,6 @@ def create_task( self.tasks[task_id] = task self._current_session_id = task_id # CraftBot compatibility - self.db_interface.log_task(task) self._sync_state_manager(task) # Notify state manager for two-tier state tracking @@ -400,7 +399,6 @@ def _clean_content(s: str) -> str: transitions.append((item, "pending", "in_progress")) self.active.todos = new_todos - self.db_interface.log_task(self.active) self._sync_state_manager(self.active) # Report transitions via hook if provided (WCA) @@ -585,7 +583,6 @@ async def _end_task( task.final_summary = summary task.errors = errors or [] - self.db_interface.log_task(task) self._sync_state_manager(task) self.event_stream_manager.log( diff --git a/agent_core/core/impl/trigger/queue.py b/agent_core/core/impl/trigger/queue.py index 09bd0ffe..509c8f44 100644 --- a/agent_core/core/impl/trigger/queue.py +++ b/agent_core/core/impl/trigger/queue.py @@ -465,9 +465,8 @@ async def fire( if t.session_id == session_id: t.fire_at = time.time() if message: - t.next_action_description += ( - f"\n\n[NEW USER MESSAGE]: {message}" - ) + # Store in payload instead of polluting the description + t.payload["pending_user_message"] = message if platform: t.payload["pending_platform"] = platform found = True @@ -481,9 +480,8 @@ async def fire( if session_id in self._active: t = self._active[session_id] if message: - t.next_action_description += ( - f"\n\n[NEW USER MESSAGE]: {message}" - ) + # Store in payload instead of polluting the description + t.payload["pending_user_message"] = message if platform: t.payload["pending_platform"] = platform logger.debug(f"[FIRE] Attached message to active trigger for session {session_id}") @@ -528,9 +526,9 @@ def pop_pending_user_message(self, session_id: str) -> tuple[str | None, str | N """ Extract and remove any pending user message from an active trigger. - When fire() attaches a message to an active trigger via - '[NEW USER MESSAGE]: ...', this method extracts that message - so it can be carried forward to the next trigger. + When fire() attaches a message to an active trigger's payload, + this method extracts that message so it can be carried forward + to the next trigger. Args: session_id: The session to check for pending messages. @@ -542,23 +540,14 @@ def pop_pending_user_message(self, session_id: str) -> tuple[str | None, str | N return None, None trigger = self._active[session_id] - marker = "\n\n[NEW USER MESSAGE]:" - desc = trigger.next_action_description - - if marker not in desc: - return None, None - - # Extract the message - idx = desc.index(marker) - message = desc[idx + len(marker):].strip() - # Extract and remove the platform from payload + # Extract and remove the message from payload + message = trigger.payload.pop("pending_user_message", None) platform = trigger.payload.pop("pending_platform", None) - # Remove the message from the trigger to avoid duplication - trigger.next_action_description = desc[:idx] + if message: + logger.debug(f"[TRIGGER] Extracted pending user message for session {session_id}: {message[:50]}...") - logger.debug(f"[TRIGGER] Extracted pending user message for session {session_id}: {message[:50]}...") return message, platform # ================================================================= diff --git a/agent_core/core/protocols/action.py b/agent_core/core/protocols/action.py index ee30e484..8d1cb2eb 100644 --- a/agent_core/core/protocols/action.py +++ b/agent_core/core/protocols/action.py @@ -242,15 +242,3 @@ async def execute_action( Result dictionary with outputs and status. """ ... - - def get_action_history(self, limit: int = 10) -> list: - """ - Get recent action history. - - Args: - limit: Maximum number of entries to return. - - Returns: - List of action history entries. - """ - ... diff --git a/agent_core/core/protocols/database.py b/agent_core/core/protocols/database.py index 4cd8c98a..1af74166 100644 --- a/agent_core/core/protocols/database.py +++ b/agent_core/core/protocols/database.py @@ -7,10 +7,7 @@ typing for database operations across different agent implementations. """ -from typing import Any, Dict, List, Optional, Protocol, TYPE_CHECKING - -if TYPE_CHECKING: - from agent_core import Task +from typing import Any, Dict, List, Optional, Protocol class DatabaseInterfaceProtocol(Protocol): @@ -21,150 +18,72 @@ class DatabaseInterfaceProtocol(Protocol): must provide for use by shared agent code. """ - def log_task(self, task: "Task") -> None: - """ - Persist or update a task log entry. - - Args: - task: The Task instance to record. - """ - ... - - def upsert_action_history( + def list_actions( self, - run_id: str, *, - session_id: str, - parent_id: Optional[str], - name: str, - action_type: str, - status: str, - inputs: Optional[Dict[str, Any]], - outputs: Optional[Dict[str, Any]], - started_at: Optional[str], - ended_at: Optional[str], - ) -> None: + default: Optional[bool] = None, + ) -> List[Dict[str, Any]]: """ - Insert or update an action execution history entry. + Return stored actions optionally filtered by the default flag. Args: - run_id: Unique identifier for the action execution. - session_id: Session that triggered the action. - parent_id: Optional parent action identifier. - name: Human-readable action name. - action_type: Action type label. - status: Current execution status. - inputs: Serialized action inputs. - outputs: Serialized action outputs. - started_at: ISO timestamp for execution start. - ended_at: ISO timestamp for execution end. + default: When provided, only return actions whose default field + matches the boolean value. + + Returns: + List of action dictionaries that satisfy the filter. """ ... - async def log_action_start_async( - self, - run_id: str, - *, - session_id: Optional[str], - parent_id: Optional[str], - name: str, - action_type: str, - inputs: Optional[Dict[str, Any]], - started_at: str, - ) -> None: + def get_action(self, name: str) -> Optional[Dict[str, Any]]: """ - Fast O(1) append for action start (async version). + Fetch a stored action by name. Args: - run_id: Unique identifier for the action execution. - session_id: Session that triggered the action. - parent_id: Optional parent action identifier. - name: Human-readable action name. - action_type: Action type label. - inputs: Serialized action inputs. - started_at: ISO timestamp for execution start. + name: The human-readable name used to identify the action. + + Returns: + The action dictionary when found, otherwise None. """ ... - async def log_action_end_async( - self, - run_id: str, - *, - outputs: Optional[Dict[str, Any]], - status: str, - ended_at: str, - ) -> None: + def store_action(self, action_dict: Dict[str, Any]) -> None: """ - Fast O(1) append for action end (async version). + Persist an action definition to disk. Args: - run_id: Unique identifier for the action execution. - outputs: Serialized action outputs. - status: Final execution status. - ended_at: ISO timestamp for execution end. + action_dict: Action payload to store, expected to include a name + field used for the filename. """ ... - def get_action_history(self, limit: int = 10) -> List[Dict[str, Any]]: + def delete_action(self, name: str) -> None: """ - Retrieve recent action history entries. + Remove an action definition from disk. Args: - limit: Maximum number of entries to return. - - Returns: - List of action history dictionaries. + name: Name of the action to delete. """ ... - def find_actions_by_status(self, status: str) -> List[Dict[str, Any]]: + def set_agent_info(self, info: Dict[str, Any], key: str = "singleton") -> None: """ - Return all action history entries matching the given status. + Persist arbitrary agent configuration under the provided key. Args: - status: Status value to filter. - - Returns: - List of matching action history dictionaries. + info: Mapping of configuration fields to store. + key: Logical namespace under which the configuration is saved. """ ... - def search_actions(self, query: str, top_k: int = 7) -> List[str]: + def get_agent_info(self, key: str = "singleton") -> Optional[Dict[str, Any]]: """ - Search actions by semantic similarity. + Load persisted agent configuration for the given key. Args: - query: Search query string. - top_k: Maximum number of results. + key: Namespace key used when persisting the configuration. Returns: - List of action names matching the query. - """ - ... - - def log_prompt( - self, - *, - input_data: Dict[str, str], - output: Optional[str], - provider: str, - model: str, - config: Dict[str, Any], - status: str, - token_count_input: Optional[int] = None, - token_count_output: Optional[int] = None, - ) -> None: - """ - Store a prompt interaction with metadata. - - Args: - input_data: Serialized prompt inputs. - output: The model output string. - provider: Name of the LLM provider. - model: Model identifier used. - config: Provider-specific configuration. - status: Execution status. - token_count_input: Token count for prompt. - token_count_output: Token count for response. + A configuration dictionary when present, otherwise None. """ ... diff --git a/agent_core/core/registry/database.py b/agent_core/core/registry/database.py index 6659d03d..ab04e20d 100644 --- a/agent_core/core/registry/database.py +++ b/agent_core/core/registry/database.py @@ -15,7 +15,7 @@ # In shared code: db = DatabaseRegistry.get() - db.log_task(task) + db.list_actions() """ from typing import TYPE_CHECKING diff --git a/app/agent_base.py b/app/agent_base.py index 6e1934db..618aa4e7 100644 --- a/app/agent_base.py +++ b/app/agent_base.py @@ -149,7 +149,6 @@ def __init__( provider=llm_provider, api_key=llm_api_key, base_url=llm_base_url, - db_interface=self.db_interface, deferred=deferred_init, ) self.vlm = VLMInterface( @@ -582,21 +581,17 @@ def _extract_trigger_data(self, trigger: Trigger) -> TriggerData: ) def _extract_user_message_from_trigger(self, trigger: Trigger) -> Optional[str]: - """Extract user message that was appended by triggers.fire(). + """Extract user message that was stored by triggers.fire(). When a message is routed to an existing session, the fire() method - appends it as '[NEW USER MESSAGE]: {message}' to next_action_description. - This message needs to be recorded to the event stream so the LLM can see it. + stores it in the trigger's payload. This message needs to be recorded + to the event stream so the LLM can see it. Returns: The user message if found, None otherwise. """ - marker = "[NEW USER MESSAGE]:" - desc = trigger.next_action_description - if marker in desc: - idx = desc.index(marker) + len(marker) - return desc[idx:].strip() - return None + payload = trigger.payload or {} + return payload.get("pending_user_message") async def _initialize_session(self, gui_mode: bool | None, session_id: str) -> None: """Initialize the agent session and set current task ID. @@ -1308,15 +1303,16 @@ async def _create_new_trigger(self, new_session_id, action_output, STATE): # Check if there's a pending user message from fire() that needs to be carried forward pending_message, pending_platform = self.triggers.pop_pending_user_message(new_session_id) - if pending_message: - next_action_desc = f"Perform the next best action for the task based on the todos and event stream\n\n[NEW USER MESSAGE]: {pending_message}" - else: - next_action_desc = "Perform the next best action for the task based on the todos and event stream" - # Build payload with platform if available + # Keep description clean - pending messages go in payload + next_action_desc = "Perform the next best action for the task based on the todos and event stream" + + # Build payload - carry forward pending message if present trigger_payload = {"gui_mode": STATE.gui_mode} + if pending_message: + trigger_payload["pending_user_message"] = pending_message if pending_platform: - trigger_payload["platform"] = pending_platform + trigger_payload["pending_platform"] = pending_platform # Build and enqueue trigger safely try: diff --git a/app/internal_action_interface.py b/app/internal_action_interface.py index 828a6a94..d6cf9778 100644 --- a/app/internal_action_interface.py +++ b/app/internal_action_interface.py @@ -652,6 +652,17 @@ async def _select_skills_and_action_sets_via_llm( logger.info(f"[TASK] LLM response: skills={selected_skills}, action_sets={selected_sets}") logger.info(f"[TASK] Valid selection: skills={valid_skills}, action_sets={valid_sets}") + # Record skill selection for metrics (skill is "invoked" when selected for prompt) + if valid_skills: + try: + from app.ui_layer.metrics.collector import MetricsCollector + collector = MetricsCollector.get_instance() + if collector: + for skill_name in valid_skills: + collector.record_skill_invocation(skill_name) + except Exception: + pass # Don't fail skill selection if metrics recording fails + return valid_skills, valid_sets except json.JSONDecodeError as e: diff --git a/app/llm/interface.py b/app/llm/interface.py index 3daf3c18..b21fa5a8 100644 --- a/app/llm/interface.py +++ b/app/llm/interface.py @@ -43,42 +43,10 @@ def __init__( model: Optional[str] = None, api_key: Optional[str] = None, base_url: Optional[str] = None, - db_interface: Optional[Any] = None, temperature: float = 0.0, max_tokens: int = 8000, deferred: bool = False, ) -> None: - # Create log_to_db hook if db_interface provided - log_to_db = None - if db_interface: - def _log_to_db( - system_prompt: Optional[str], - user_prompt: str, - output: str, - status: str, - token_count_input: int, - token_count_output: int, - ) -> None: - input_data: Dict[str, Optional[str]] = { - "system_prompt": system_prompt, - "user_prompt": user_prompt, - } - config: Dict[str, Any] = { - "temperature": self.temperature, - "max_tokens": self.max_tokens, - } - db_interface.log_prompt( - input_data=input_data, - output=output, - provider=self.provider, - model=self.model, - config=config, - status=status, - token_count_input=token_count_input, - token_count_output=token_count_output, - ) - log_to_db = _log_to_db - super().__init__( provider=provider, model=model, @@ -90,8 +58,4 @@ def _log_to_db( get_token_count=_get_token_count, set_token_count=_set_token_count, report_usage=_report_usage, # Report usage to local SQLite storage - log_to_db=log_to_db, ) - - # Store db_interface reference for compatibility - self.db_interface = db_interface diff --git a/app/llm_interface.py b/app/llm_interface.py index 58419dac..efeefc01 100644 --- a/app/llm_interface.py +++ b/app/llm_interface.py @@ -2185,41 +2185,6 @@ def _generate_anthropic( "cached_tokens": cached_tokens, } - # ─────────────────── Internal utilities ─────────────────── - @profile("llm_log_to_db", OperationCategory.DATABASE) - def _log_to_db( - self, - system_prompt: str | None, - user_prompt: str, - output: str, - status: str, - token_count_input: int, - token_count_output: int, - ) -> None: - """Persist prompt/response metadata using the optional `db_interface`.""" - if not self.db_interface: - return - - input_data: Dict[str, Optional[str]] = { - "system_prompt": system_prompt, - "user_prompt": user_prompt, - } - config: Dict[str, Any] = { - "temperature": self.temperature, - "max_tokens": self.max_tokens, - } - - self.db_interface.log_prompt( - input_data=input_data, - output=output, - provider=self.provider, - model=self.model, - config=config, - status=status, - token_count_input=token_count_input, - token_count_output=token_count_output, - ) - # ─────────────────── CLI helper for ad‑hoc testing ─────────────────── def _cli(self) -> None: # pragma: no cover """Run a quick interactive shell for manual testing.""" diff --git a/app/ui_layer/browser/frontend/src/pages/Chat/ChatPage.tsx b/app/ui_layer/browser/frontend/src/pages/Chat/ChatPage.tsx index bc92cf7d..1a1d9f64 100644 --- a/app/ui_layer/browser/frontend/src/pages/Chat/ChatPage.tsx +++ b/app/ui_layer/browser/frontend/src/pages/Chat/ChatPage.tsx @@ -488,7 +488,7 @@ export function ChatPage() { > {task.name} - {task.status === 'running' && ( + {(task.status === 'running' || task.status === 'waiting') && (

MCP Servers

- 0 ? 'success' : 'default'}> - {mcpConnectedServers}/{mcpTotalServers} -
- - {mcpTotalTools} - Tools + + {mcpConnectedServers} + Connected
- + {mcpTotalCalls} Calls
@@ -517,9 +514,6 @@ export function DashboardPage() {

Skills

- 0 ? 'success' : 'default'}> - {skillEnabled}/{skillTotal} -
diff --git a/app/ui_layer/browser/frontend/src/pages/Tasks/TasksPage.tsx b/app/ui_layer/browser/frontend/src/pages/Tasks/TasksPage.tsx index 7df6f7b6..646eac2b 100644 --- a/app/ui_layer/browser/frontend/src/pages/Tasks/TasksPage.tsx +++ b/app/ui_layer/browser/frontend/src/pages/Tasks/TasksPage.tsx @@ -449,7 +449,7 @@ export function TasksPage() {
{selectedItem.itemType === 'task' && (
- {selectedItem.status === 'running' ? ( + {(selectedItem.status === 'running' || selectedItem.status === 'waiting') ? (