From 9724f73e3ade57b3ea59d6e6958927615f359394 Mon Sep 17 00:00:00 2001 From: zfoong Date: Tue, 7 Apr 2026 17:12:50 +0900 Subject: [PATCH] feature:implemented tasks and event stream persistent --- agent_core/core/event_stream/event.py | 51 ++- agent_core/core/impl/event_stream/manager.py | 10 +- agent_core/core/impl/task/manager.py | 42 ++- agent_core/core/impl/trigger/queue.py | 3 +- app/agent_base.py | 246 ++++++++++++- app/config/settings.json | 2 +- app/data/agent_file_system_template/SOUL.md | 23 +- app/task/task_manager.py | 23 ++ app/ui_layer/adapters/browser_adapter.py | 7 +- app/ui_layer/controller/ui_controller.py | 14 + app/usage/action_storage.py | 26 +- app/usage/session_storage.py | 349 +++++++++++++++++++ 12 files changed, 761 insertions(+), 35 deletions(-) create mode 100644 app/usage/session_storage.py diff --git a/agent_core/core/event_stream/event.py b/agent_core/core/event_stream/event.py index e39ba169..59aa3160 100644 --- a/agent_core/core/event_stream/event.py +++ b/agent_core/core/event_stream/event.py @@ -24,7 +24,7 @@ from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Optional, List +from typing import Any, Dict, Optional, List SEVERITIES = ("DEBUG", "INFO", "WARN", "ERROR") @@ -64,6 +64,32 @@ def display_text(self) -> Optional[str]: """ return self.display_message + def to_dict(self) -> Dict[str, Any]: + """Serialize the event to a dictionary for persistence.""" + return { + "message": self.message, + "kind": self.kind, + "severity": self.severity, + "display_message": self.display_message, + "ts": self.ts.isoformat(), + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "Event": + """Deserialize an event from a dictionary.""" + ts = ( + datetime.fromisoformat(data["ts"]) + if isinstance(data.get("ts"), str) + else datetime.now(timezone.utc) + ) + return cls( + message=data["message"], + kind=data["kind"], + severity=data["severity"], + display_message=data.get("display_message"), + ts=ts, + ) + @property def iso_ts(self) -> str: """ @@ -92,6 +118,29 @@ class EventRecord: repeat_count: int = 1 _cached_tokens: int | None = field(default=None, repr=False) + def to_dict(self) -> Dict[str, Any]: + """Serialize the event record to a dictionary for persistence.""" + return { + "event": self.event.to_dict(), + "ts": self.ts.isoformat(), + "repeat_count": self.repeat_count, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "EventRecord": + """Deserialize an event record from a dictionary.""" + event = Event.from_dict(data["event"]) + ts = ( + datetime.fromisoformat(data["ts"]) + if isinstance(data.get("ts"), str) + else datetime.now(timezone.utc) + ) + return cls( + event=event, + ts=ts, + repeat_count=data.get("repeat_count", 1), + ) + def compact_line(self) -> str: """ Generate a compact single-line representation of this event. diff --git a/agent_core/core/impl/event_stream/manager.py b/agent_core/core/impl/event_stream/manager.py index 27e73ba4..89284f4b 100644 --- a/agent_core/core/impl/event_stream/manager.py +++ b/agent_core/core/impl/event_stream/manager.py @@ -15,7 +15,7 @@ from __future__ import annotations from datetime import datetime, timezone from pathlib import Path -from typing import Dict, List, Optional +from typing import Callable, Dict, List, Optional import threading from agent_core.core.impl.event_stream.event_stream import EventStream @@ -64,7 +64,9 @@ class EventStreamManager: def __init__( self, llm: LLMInterfaceProtocol, - agent_file_system_path: Optional[Path] = None + agent_file_system_path: Optional[Path] = None, + on_stream_persist: Optional[Callable[[str, "EventStream"], None]] = None, + on_stream_remove_persist: Optional[Callable[[str], None]] = None, ) -> None: # Main stream for conversation mode (not task-specific) self._main_stream: EventStream = EventStream(llm=llm, temp_dir=None) @@ -77,6 +79,10 @@ def __init__( self._skip_unprocessed_logging = False self._file_lock = threading.Lock() + # Session persistence hooks + self._on_stream_persist = on_stream_persist + self._on_stream_remove_persist = on_stream_remove_persist + # Conversation history for context injection into tasks # Stores recent user AND agent messages without affecting TUI display self._conversation_history: List[Event] = [] diff --git a/agent_core/core/impl/task/manager.py b/agent_core/core/impl/task/manager.py index 9e3175b1..a83b60a7 100644 --- a/agent_core/core/impl/task/manager.py +++ b/agent_core/core/impl/task/manager.py @@ -58,6 +58,10 @@ OnStreamCreateHook = Callable[[str, Path], None] # (task_id, temp_dir) OnStreamRemoveHook = Callable[[str], None] # (task_id) +# Session persistence hooks +OnTaskPersistHook = Callable[["Task"], None] # (task) +OnTaskRemovePersistHook = Callable[[str], None] # (task_id) + # Chatserver hooks (WCA only) OnTaskCreatedChatserverHook = Callable[[Task], None] OnTodoTransitionHook = Callable[[List[tuple]], None] # List of (todo, old_status, new_status) @@ -94,6 +98,9 @@ def __init__( # Event stream hooks on_stream_create: Optional[OnStreamCreateHook] = None, on_stream_remove: Optional[OnStreamRemoveHook] = None, + # Session persistence hooks + on_task_persist: Optional[OnTaskPersistHook] = None, + on_task_remove_persist: Optional[OnTaskRemovePersistHook] = None, # Chatserver hooks (WCA only) on_task_created_chatserver: Optional[OnTaskCreatedChatserverHook] = None, on_todo_transition: Optional[OnTodoTransitionHook] = None, @@ -124,6 +131,10 @@ def __init__( on_stream_create: Called to set up event stream for task. on_stream_remove: Called to clean up event stream on task end. + Session persistence hooks: + on_task_persist: Called on every task state change to persist task to disk. + on_task_remove_persist: Called when task ends to remove persisted data. + Chatserver hooks (WCA only): on_task_created_chatserver: POST task to chatserver. on_todo_transition: Report todo transitions to chatserver. @@ -156,6 +167,10 @@ def __init__( self._on_stream_create = on_stream_create self._on_stream_remove = on_stream_remove + # Session persistence hooks + self._on_task_persist = on_task_persist + self._on_task_remove_persist = on_task_remove_persist + # Chatserver hooks (WCA only, default to None/no-op) self._on_task_created_chatserver = on_task_created_chatserver self._on_todo_transition = on_todo_transition @@ -616,6 +631,13 @@ async def _end_task( if self._current_session_id == task.id: self._current_session_id = None + # Remove persisted session data (task + event stream) + if self._on_task_remove_persist: + try: + self._on_task_remove_persist(task.id) + except Exception as e: + logger.warning(f"[TaskManager] Failed to remove persisted task {task.id}: {e}") + # Clean up session-specific state (multi-task isolation) StateSession.end(task.id) @@ -658,9 +680,15 @@ async def _end_task( logger.warning(f"[ONBOARDING] Failed to mark soft onboarding complete: {e}") def _sync_state_manager(self, task: Optional[Task]) -> None: - """Sync task state to the state manager.""" + """Sync task state to the state manager and persist to disk.""" if self.state_manager: self.state_manager.add_to_active_task(task=task) + # Persist task state for crash recovery + if task and self._on_task_persist: + try: + self._on_task_persist(task) + except Exception as e: + logger.warning(f"[TaskManager] Failed to persist task {task.id}: {e}") def _log_to_task_history(self, task: Task, note: Optional[str] = None) -> None: """Log completed task to TASK_HISTORY.md.""" @@ -729,16 +757,22 @@ def _cleanup_task_temp_dir(self, task: Task) -> None: except Exception: logger.warning(f"[TaskManager] Failed to clean temp dir for {task.id}", exc_info=True) - def cleanup_all_temp_dirs(self) -> int: - """Remove all temporary directories in workspace/tmp/.""" + def cleanup_all_temp_dirs(self, exclude: Optional[set] = None) -> int: + """Remove temporary directories in workspace/tmp/, optionally excluding some. + + Args: + exclude: Set of task IDs whose temp directories should be preserved + (e.g., restored tasks that need their workspace). + """ temp_root = self.workspace_root / "tmp" if not temp_root.exists(): return 0 + exclude = exclude or set() cleaned_count = 0 try: for item in temp_root.iterdir(): - if item.is_dir(): + if item.is_dir() and item.name not in exclude: try: shutil.rmtree(item, ignore_errors=True) cleaned_count += 1 diff --git a/agent_core/core/impl/trigger/queue.py b/agent_core/core/impl/trigger/queue.py index 3ef3d12e..817399aa 100644 --- a/agent_core/core/impl/trigger/queue.py +++ b/agent_core/core/impl/trigger/queue.py @@ -156,7 +156,7 @@ def create_task_state(self) -> str: async def clear(self) -> None: """ - Remove all pending triggers from the queue. + Remove all pending and active triggers from the queue. The queue is cleared under the protection of the condition variable so waiting consumers are notified immediately that the queue state has @@ -164,6 +164,7 @@ async def clear(self) -> None: """ async with self._cv: self._heap.clear() + self._active.clear() self._cv.notify_all() # ================================================================= diff --git a/app/agent_base.py b/app/agent_base.py index f031b9ac..1a34aa3d 100644 --- a/app/agent_base.py +++ b/app/agent_base.py @@ -60,6 +60,7 @@ from app.trigger import Trigger, TriggerQueue from app.prompt import ROUTE_TO_SESSION_PROMPT from app.state.types import ReasoningResult +from agent_core.core.task import Task from app.task.task_manager import TaskManager from app.event_stream import EventStreamManager from app.gui.gui_module import GUIModule @@ -161,7 +162,7 @@ def __init__( self.event_stream_manager = EventStreamManager( self.llm, - agent_file_system_path=AGENT_FILE_SYSTEM_PATH + agent_file_system_path=AGENT_FILE_SYSTEM_PATH, ) # action & task layers @@ -199,8 +200,13 @@ def __init__( self.triggers.set_task_manager(self.task_manager) self.triggers.set_event_stream_manager(self.event_stream_manager) - # Clean up any leftover temp directories from previous runs - self.task_manager.cleanup_all_temp_dirs() + # Set _interface_mode early so context_engine.make_prompt() works during restore + # (will be updated again in run() based on selected interface) + self._interface_mode: str = "tui" + + # Restore active sessions from previous run, then clean up leftover temp dirs + self._restored_task_ids = self._restore_sessions() + self.task_manager.cleanup_all_temp_dirs(exclude=self._restored_task_ids) # ── memory manager for proactive agent ── self.memory_manager = MemoryManager( @@ -268,7 +274,6 @@ def __init__( # ── misc ── self.is_running: bool = True - self._interface_mode: str = "tui" # Will be updated in run() based on selected interface self.ui_controller = None # Set by interface after UIController is created self._extra_system_prompt: str = self._load_extra_system_prompt() @@ -2001,6 +2006,13 @@ async def reset_agent_state(self) -> str: # 6. Clear usage data (chat, actions, tasks, usage) await self._clear_usage_data() + # 7. Clear persisted session data (tasks, event streams, triggers) + try: + from app.usage.session_storage import get_session_storage + get_session_storage().clear_all() + except Exception as e: + logger.warning(f"[RESET] Failed to clear session storage: {e}") + return "Agent state reset. Agent file system reinitialized." async def _clear_usage_data(self) -> None: @@ -2283,6 +2295,227 @@ async def _shutdown_mcp(self) -> None: except Exception as e: logger.warning(f"[MCP] Error during MCP shutdown: {e}") + # ===================================== + # Session Persistence & Restoration + # ===================================== + + def _restore_sessions(self) -> set: + """ + Restore active tasks and event streams from the previous session. + + Called during __init__ after all components are initialized. + Returns a set of restored task IDs (used to exclude their temp dirs + from cleanup). + """ + restored_ids = set() + try: + from app.usage.session_storage import get_session_storage + from agent_core.core.impl.event_stream.event_stream import get_cached_token_count + storage = get_session_storage() + + # 1. Restore main event stream + head_summary, records = storage.get_event_stream("__main__") + if head_summary or records: + main_stream = self.event_stream_manager.get_main_stream() + main_stream.head_summary = head_summary + main_stream.tail_events = records + main_stream._total_tokens = sum( + get_cached_token_count(r) for r in records + ) + logger.info( + f"[RESTORE] Restored main event stream " + f"({len(records)} events)" + ) + + # 2. Restore conversation history + conv_events = storage.get_conversation_history() + if conv_events: + self.event_stream_manager._conversation_history = conv_events + logger.info( + f"[RESTORE] Restored {len(conv_events)} conversation history messages" + ) + + # 3. Restore active tasks and their event streams + active_tasks = storage.get_all_active_tasks() + for task_data in active_tasks: + try: + task_dict = json.loads(task_data["task_json"]) + task = Task.from_dict(task_dict) + task_id = task.id + + # Recreate temp directory + temp_dir = self.task_manager._prepare_task_temp_dir(task_id) + task.temp_dir = str(temp_dir) + + # Insert task into TaskManager + self.task_manager.tasks[task_id] = task + self.task_manager._current_session_id = task_id + + # Create and restore per-task event stream + stream = self.event_stream_manager.create_stream( + task_id, temp_dir + ) + t_head, t_records = storage.get_event_stream(task_id) + stream.head_summary = t_head + stream.tail_events = t_records + stream._total_tokens = sum( + get_cached_token_count(r) for r in t_records + ) + + # Log restoration event + self.event_stream_manager.log( + "system", + "Task restored after agent restart. " + "Resuming from previous state.", + task_id=task_id, + ) + + # Recreate LLM session caches + self.task_manager._create_session_caches(task_id) + + # Sync with state manager + if self.state_manager: + self.state_manager.on_task_created(task) + self.state_manager.add_to_active_task(task=task) + + restored_ids.add(task_id) + logger.info( + f"[RESTORE] Restored task '{task.name}' " + f"(id={task_id}, status={task.status}, " + f"events={len(t_records)})" + ) + + except Exception as e: + logger.warning( + f"[RESTORE] Failed to restore task " + f"{task_data.get('task_id', '?')}: {e}" + ) + # Remove corrupt task data + try: + storage.remove_task(task_data.get("task_id", "")) + except Exception: + pass + + if restored_ids: + logger.info( + f"[RESTORE] Successfully restored {len(restored_ids)} " + f"task(s) from previous session" + ) + + except Exception as e: + logger.warning(f"[RESTORE] Session restoration failed: {e}") + + return restored_ids + + def _persist_all_sessions(self) -> None: + """ + Persist all active tasks, event streams, and conversation history. + + Called during graceful shutdown to ensure state survives restarts. + """ + try: + from app.usage.session_storage import get_session_storage + storage = get_session_storage() + + # 1. Persist all active tasks and their event streams + task_count = 0 + for task_id, task in self.task_manager.tasks.items(): + try: + storage.persist_task(task) + # Persist this task's event stream + stream = self.event_stream_manager.get_stream_by_id(task_id) + if stream: + storage.persist_event_stream(task_id, stream) + task_count += 1 + except Exception as e: + logger.warning( + f"[PERSIST] Failed to persist task {task_id}: {e}" + ) + + # 2. Persist main event stream + try: + main_stream = self.event_stream_manager.get_main_stream() + storage.persist_main_stream(main_stream) + except Exception as e: + logger.warning(f"[PERSIST] Failed to persist main stream: {e}") + + # 3. Persist conversation history + try: + conv_history = self.event_stream_manager._conversation_history + if conv_history: + storage.persist_conversation_history(conv_history) + except Exception as e: + logger.warning( + f"[PERSIST] Failed to persist conversation history: {e}" + ) + + if task_count > 0: + logger.info( + f"[PERSIST] Saved {task_count} active task(s) and " + f"event streams for recovery" + ) + + except Exception as e: + logger.warning(f"[PERSIST] Session persistence failed: {e}") + + async def _schedule_restored_task_triggers(self) -> None: + """ + Schedule triggers for tasks restored from the previous session. + + Running tasks get an immediate continuation trigger. + Tasks waiting for user reply get a waiting trigger. + """ + if not hasattr(self, '_restored_task_ids') or not self._restored_task_ids: + return + + for task_id in self._restored_task_ids: + task = self.task_manager.tasks.get(task_id) + if not task or task.status != "running": + continue + + try: + if task.waiting_for_user_reply: + await self.triggers.put( + Trigger( + fire_at=time.time(), + priority=5, + next_action_description=( + "Waiting for user reply " + "(resumed after restart)" + ), + session_id=task_id, + payload={"gui_mode": STATE.gui_mode}, + waiting_for_reply=True, + ), + skip_merge=True, + ) + logger.info( + f"[RESTORE] Scheduled waiting trigger for " + f"task '{task.name}'" + ) + else: + await self.triggers.put( + Trigger( + fire_at=time.time(), + priority=5, + next_action_description=( + "Resume task after agent restart" + ), + session_id=task_id, + payload={"gui_mode": STATE.gui_mode}, + ), + skip_merge=True, + ) + logger.info( + f"[RESTORE] Scheduled resume trigger for " + f"task '{task.name}'" + ) + except Exception as e: + logger.warning( + f"[RESTORE] Failed to schedule trigger for " + f"task {task_id}: {e}" + ) + # ===================================== # Skills Integration # ===================================== @@ -2493,6 +2726,9 @@ def print_startup_step(step: int, total: int, message: str): name="scheduler_config.json" ) + # Resume triggers for tasks restored from previous session + await self._schedule_restored_task_triggers() + # Trigger soft onboarding if needed (BEFORE starting interface) # This ensures agent handles onboarding logic, not the interfaces from app.onboarding import onboarding_manager @@ -2553,6 +2789,8 @@ def print_startup_step(step: int, total: int, message: str): await interface.start() finally: + # Persist all active sessions before shutdown (for crash recovery) + self._persist_all_sessions() # Shutdown scheduler (handles all periodic tasks including memory processing) self.is_running = False await self.scheduler.shutdown() diff --git a/app/config/settings.json b/app/config/settings.json index 669d7ebd..5738fc68 100644 --- a/app/config/settings.json +++ b/app/config/settings.json @@ -22,7 +22,7 @@ "openai": "", "anthropic": "", "google": "", - "byteplus": "", + "byteplus": "92e306e8-d5d6-4a13-ab3f-57750b61ff90", "deepseek": "" }, "endpoints": { diff --git a/app/data/agent_file_system_template/SOUL.md b/app/data/agent_file_system_template/SOUL.md index b663cad3..4aa8f720 100644 --- a/app/data/agent_file_system_template/SOUL.md +++ b/app/data/agent_file_system_template/SOUL.md @@ -1,27 +1,24 @@ # Soul ## Personality -- Friendly, warm, and approachable — but not over-the-top cheerful -- Witty and occasionally humorous when appropriate, never forced -- Confident and direct — say what you mean without hedging -- Curious and genuinely interested in what the user is working on -- Patient with mistakes, encouraging with progress +- Friendly, warm, and approachable, but don't over do it +- Be direct, say what you mean without hedging, get straight to the point +- Proactive, you care about user more than they do, and always try to help user improves ## Tone -- Conversational but professional — like a trusted colleague - Concise by default, detailed when it matters +- Be concrete without over using fancy words - No corporate jargon or filler phrases -- Match the user's energy — casual if they're casual, focused if they're focused +- Match the user's energy. Casual if they're casual, focused if they're focused ## Behavior - Be proactive: suggest improvements, flag potential issues, offer alternatives -- Own your mistakes — if you get something wrong, acknowledge it simply and fix it -- Don't over-explain unless asked — respect the user's intelligence +- Own your mistakes. If you get something wrong, acknowledge it simply and fix it +- Don't over-explain unless asked - When uncertain, say so honestly rather than guessing confidently -- Celebrate small wins without being patronizing +- Use emoji sparingly +- Chat like a human would, don't over use list or em dash. ## Quirks -- (Add personality quirks here — e.g., "Always uses cooking metaphors", "Ends complex explanations with a one-liner summary") +- Format your message like a human would -## Special Instructions -- (Add any special behavioral instructions here — e.g., "Always greet users by name", "Use emoji sparingly", "Default to bullet points for lists") diff --git a/app/task/task_manager.py b/app/task/task_manager.py index fc821d0a..cb338ea5 100644 --- a/app/task/task_manager.py +++ b/app/task/task_manager.py @@ -10,11 +10,13 @@ from pathlib import Path from agent_core.core.impl.task import TaskManager as _TaskManager +from agent_core.core.task import Task from app.database_interface import DatabaseInterface from app.event_stream import EventStreamManager from app.state.state_manager import StateManager from app.state.agent_state import STATE from app.config import AGENT_WORKSPACE_ROOT, AGENT_FILE_SYSTEM_PATH +from app.logger import logger if TYPE_CHECKING: from app.llm import LLMInterface @@ -48,6 +50,24 @@ def on_stream_create(task_id: str, temp_dir: Path) -> None: return on_stream_create +def _on_task_persist(task: Task) -> None: + """Persist task state to SessionStorage for crash recovery.""" + try: + from app.usage.session_storage import get_session_storage + get_session_storage().persist_task(task) + except Exception as e: + logger.warning(f"[TaskManager] Failed to persist task {task.id}: {e}") + + +def _on_task_remove_persist(task_id: str) -> None: + """Remove persisted task and its event stream from SessionStorage.""" + try: + from app.usage.session_storage import get_session_storage + get_session_storage().remove_task(task_id) + except Exception as e: + logger.warning(f"[TaskManager] Failed to remove persisted task {task_id}: {e}") + + def _make_on_stream_remove(event_stream_manager: EventStreamManager): """Create hook for event stream removal on task completion.""" def on_stream_remove(task_id: str) -> None: @@ -90,6 +110,9 @@ def __init__( # Event stream hooks for per-task streams (CRITICAL for multi-tasking) on_stream_create=_make_on_stream_create(event_stream_manager), on_stream_remove=_make_on_stream_remove(event_stream_manager), + # Session persistence hooks for crash recovery + on_task_persist=_on_task_persist, + on_task_remove_persist=_on_task_remove_persist, # No chatserver hooks for CraftBot (local only) on_task_created_chatserver=None, on_todo_transition=None, diff --git a/app/ui_layer/adapters/browser_adapter.py b/app/ui_layer/adapters/browser_adapter.py index c20337b9..81f57f6f 100644 --- a/app/ui_layer/adapters/browser_adapter.py +++ b/app/ui_layer/adapters/browser_adapter.py @@ -357,8 +357,11 @@ def _init_storage(self) -> None: from app.usage.action_storage import get_action_storage, StoredActionItem self._storage = get_action_storage() - # Mark any stale running items as cancelled from previous session - self._storage.mark_running_as_cancelled() + # Mark stale running items as cancelled, but exclude restored tasks + restored_ids = getattr( + self._adapter._controller.agent, '_restored_task_ids', set() + ) + self._storage.mark_running_as_cancelled(exclude=restored_ids) # Load recent tasks (and their child actions) from storage stored_items = self._storage.get_recent_tasks_with_actions(task_limit=15) diff --git a/app/ui_layer/controller/ui_controller.py b/app/ui_layer/controller/ui_controller.py index bf6f00f6..f65125bf 100644 --- a/app/ui_layer/controller/ui_controller.py +++ b/app/ui_layer/controller/ui_controller.py @@ -286,6 +286,20 @@ async def submit_message( async def _watch_agent_events(self) -> None: """Watch and transform agent events to UI events.""" + # Mark all pre-existing events as seen so restored events + # from previous sessions are not emitted as new UI messages. + # State-updating events (task_start, task_end) are still processed + # to rebuild UI state (e.g., show restored tasks as running). + streams = self._agent.event_stream_manager.get_all_streams_with_ids() + for task_id, stream in streams: + for event in stream.as_list(): + key = (event.iso_ts, event.kind, event.message) + self._state_store.dispatch("MARK_EVENT_SEEN", key) + # Rebuild UI state from restored events without emitting to UI + ui_event = EventTransformer.transform(event, task_id) + if ui_event: + self._update_state_from_event(ui_event) + while self._running and self._agent.is_running: try: # Get all event streams diff --git a/app/usage/action_storage.py b/app/usage/action_storage.py index 265860e4..1c41c154 100644 --- a/app/usage/action_storage.py +++ b/app/usage/action_storage.py @@ -354,24 +354,36 @@ def delete_item(self, item_id: str) -> bool: conn.commit() return cursor.rowcount > 0 - def mark_running_as_cancelled(self) -> int: + def mark_running_as_cancelled(self, exclude: Optional[set] = None) -> int: """ - Mark all running items as cancelled. + Mark running items as cancelled, optionally excluding some. This should be called on startup to clean up stale running items from a previous session. + Args: + exclude: Set of item IDs to skip (e.g., restored tasks that + are still legitimately running). + Returns: Number of items updated. """ import time as time_module with sqlite3.connect(self._db_path) as conn: cursor = conn.cursor() - cursor.execute(""" - UPDATE action_items - SET status = 'cancelled', completed_at = ? - WHERE status = 'running' - """, (time_module.time(),)) + if exclude: + placeholders = ",".join("?" for _ in exclude) + cursor.execute(f""" + UPDATE action_items + SET status = 'cancelled', completed_at = ? + WHERE status = 'running' AND id NOT IN ({placeholders}) + """, (time_module.time(), *exclude)) + else: + cursor.execute(""" + UPDATE action_items + SET status = 'cancelled', completed_at = ? + WHERE status = 'running' + """, (time_module.time(),)) conn.commit() return cursor.rowcount diff --git a/app/usage/session_storage.py b/app/usage/session_storage.py new file mode 100644 index 00000000..9eac006c --- /dev/null +++ b/app/usage/session_storage.py @@ -0,0 +1,349 @@ +# -*- coding: utf-8 -*- +""" +app.usage.session_storage + +SQLite-based storage for active session state (tasks + event streams). +Provides persistence across agent restarts so that running tasks and their +event context can be restored. +""" + +from __future__ import annotations + +import json +import logging +import sqlite3 +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +from agent_core.core.task import Task +from agent_core.core.event_stream.event import Event, EventRecord +from agent_core.core.impl.event_stream.event_stream import EventStream + +try: + from app.logger import logger +except Exception: + logger = logging.getLogger(__name__) + logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") + + +# Sentinel stream ID for the main (non-task) event stream +MAIN_STREAM_ID = "__main__" + +# Tasks older than this (in hours) are considered stale and not restored +STALE_TASK_HOURS = 24 + + +class SessionStorage: + """ + SQLite-based storage for active session state. + + Persists running tasks and their event streams so they can be restored + after an agent restart. Completed/cancelled tasks are removed from this + store (they live in task_storage.py for analytics). + """ + + def __init__(self, db_path: Optional[str] = None): + if db_path is None: + from app.config import APP_DATA_PATH + usage_dir = Path(APP_DATA_PATH) / ".usage" + usage_dir.mkdir(parents=True, exist_ok=True) + db_path = str(usage_dir / "sessions.db") + + self._db_path = db_path + self._init_db() + logger.info(f"[SessionStorage] Initialized at {self._db_path}") + + def _init_db(self) -> None: + """Initialize the database schema.""" + with sqlite3.connect(self._db_path) as conn: + conn.execute("PRAGMA journal_mode=WAL") + cursor = conn.cursor() + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS active_tasks ( + task_id TEXT PRIMARY KEY, + task_json TEXT NOT NULL, + updated_at TEXT NOT NULL + ) + """) + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS event_streams ( + stream_id TEXT PRIMARY KEY, + head_summary TEXT, + updated_at TEXT NOT NULL + ) + """) + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS event_records ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + stream_id TEXT NOT NULL, + event_json TEXT NOT NULL, + position INTEGER NOT NULL, + FOREIGN KEY (stream_id) REFERENCES event_streams(stream_id) + ) + """) + + cursor.execute(""" + CREATE INDEX IF NOT EXISTS idx_event_records_stream + ON event_records(stream_id, position) + """) + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS conversation_history ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event_json TEXT NOT NULL, + position INTEGER NOT NULL + ) + """) + + # Clean up triggers table from previous versions (no longer used) + cursor.execute("DROP TABLE IF EXISTS triggers") + + conn.commit() + + # ─────────────────────── Task Persistence ─────────────────────────────── + + def persist_task(self, task: Task) -> None: + """Upsert a task into the active_tasks table.""" + now = datetime.now(timezone.utc).isoformat() + task_json = json.dumps(task.to_dict(), default=str) + with sqlite3.connect(self._db_path) as conn: + conn.execute( + """ + INSERT INTO active_tasks (task_id, task_json, updated_at) + VALUES (?, ?, ?) + ON CONFLICT(task_id) DO UPDATE SET + task_json = excluded.task_json, + updated_at = excluded.updated_at + """, + (task.id, task_json, now), + ) + conn.commit() + + def remove_task(self, task_id: str) -> None: + """Remove a task and its associated event stream from persistence.""" + with sqlite3.connect(self._db_path) as conn: + conn.execute("DELETE FROM active_tasks WHERE task_id = ?", (task_id,)) + conn.execute("DELETE FROM event_records WHERE stream_id = ?", (task_id,)) + conn.execute("DELETE FROM event_streams WHERE stream_id = ?", (task_id,)) + conn.commit() + + def get_all_active_tasks(self) -> List[Dict[str, Any]]: + """Return all active tasks, filtering out stale ones.""" + with sqlite3.connect(self._db_path) as conn: + cursor = conn.cursor() + cursor.execute( + "SELECT task_id, task_json, updated_at FROM active_tasks" + ) + rows = cursor.fetchall() + + now = datetime.now(timezone.utc) + results = [] + stale_ids = [] + + for task_id, task_json, updated_at in rows: + try: + updated = datetime.fromisoformat(updated_at) + # Make timezone-aware if naive + if updated.tzinfo is None: + updated = updated.replace(tzinfo=timezone.utc) + age_hours = (now - updated).total_seconds() / 3600 + if age_hours > STALE_TASK_HOURS: + stale_ids.append(task_id) + logger.info( + f"[SessionStorage] Skipping stale task {task_id} " + f"(last updated {age_hours:.1f}h ago)" + ) + continue + except (ValueError, TypeError): + pass # If we can't parse the timestamp, include the task + + results.append({ + "task_id": task_id, + "task_json": task_json, + "updated_at": updated_at, + }) + + # Clean up stale tasks + if stale_ids: + with sqlite3.connect(self._db_path) as conn: + for tid in stale_ids: + conn.execute("DELETE FROM active_tasks WHERE task_id = ?", (tid,)) + conn.execute("DELETE FROM event_records WHERE stream_id = ?", (tid,)) + conn.execute("DELETE FROM event_streams WHERE stream_id = ?", (tid,)) + conn.commit() + logger.info(f"[SessionStorage] Cleaned up {len(stale_ids)} stale tasks") + + return results + + # ─────────────────────── Event Stream Persistence ─────────────────────── + + def persist_event_stream(self, stream_id: str, stream: EventStream) -> None: + """Persist an event stream's head_summary and tail_events.""" + now = datetime.now(timezone.utc).isoformat() + with sqlite3.connect(self._db_path) as conn: + # Upsert stream metadata + conn.execute( + """ + INSERT INTO event_streams (stream_id, head_summary, updated_at) + VALUES (?, ?, ?) + ON CONFLICT(stream_id) DO UPDATE SET + head_summary = excluded.head_summary, + updated_at = excluded.updated_at + """, + (stream_id, stream.head_summary, now), + ) + + # Replace all event records for this stream + conn.execute( + "DELETE FROM event_records WHERE stream_id = ?", (stream_id,) + ) + + for position, record in enumerate(stream.tail_events): + event_json = json.dumps(record.to_dict(), default=str) + conn.execute( + """ + INSERT INTO event_records (stream_id, event_json, position) + VALUES (?, ?, ?) + """, + (stream_id, event_json, position), + ) + + conn.commit() + + def persist_main_stream(self, stream: EventStream) -> None: + """Shorthand for persisting the main (non-task) event stream.""" + self.persist_event_stream(MAIN_STREAM_ID, stream) + + def remove_event_stream(self, stream_id: str) -> None: + """Remove a persisted event stream and its records.""" + with sqlite3.connect(self._db_path) as conn: + conn.execute("DELETE FROM event_records WHERE stream_id = ?", (stream_id,)) + conn.execute("DELETE FROM event_streams WHERE stream_id = ?", (stream_id,)) + conn.commit() + + def get_event_stream( + self, stream_id: str + ) -> Tuple[Optional[str], List[EventRecord]]: + """ + Restore an event stream's data. + + Returns: + Tuple of (head_summary, list of EventRecord objects). + """ + with sqlite3.connect(self._db_path) as conn: + cursor = conn.cursor() + + # Get head summary + cursor.execute( + "SELECT head_summary FROM event_streams WHERE stream_id = ?", + (stream_id,), + ) + row = cursor.fetchone() + head_summary = row[0] if row else None + + # Get event records ordered by position + cursor.execute( + """ + SELECT event_json FROM event_records + WHERE stream_id = ? + ORDER BY position ASC + """, + (stream_id,), + ) + records = [] + for (event_json,) in cursor.fetchall(): + try: + data = json.loads(event_json) + records.append(EventRecord.from_dict(data)) + except (json.JSONDecodeError, KeyError, TypeError) as e: + logger.warning( + f"[SessionStorage] Skipping corrupt event record " + f"for stream {stream_id}: {e}" + ) + + return head_summary, records + + # ─────────────────────── Conversation History ─────────────────────────── + + def persist_conversation_history(self, messages: List[Event]) -> None: + """Replace persisted conversation history with the current list.""" + with sqlite3.connect(self._db_path) as conn: + conn.execute("DELETE FROM conversation_history") + for position, event in enumerate(messages): + event_json = json.dumps(event.to_dict(), default=str) + conn.execute( + """ + INSERT INTO conversation_history (event_json, position) + VALUES (?, ?) + """, + (event_json, position), + ) + conn.commit() + + def get_conversation_history(self) -> List[Event]: + """Restore conversation history.""" + with sqlite3.connect(self._db_path) as conn: + cursor = conn.cursor() + cursor.execute( + "SELECT event_json FROM conversation_history ORDER BY position ASC" + ) + events = [] + for (event_json,) in cursor.fetchall(): + try: + data = json.loads(event_json) + events.append(Event.from_dict(data)) + except (json.JSONDecodeError, KeyError, TypeError) as e: + logger.warning( + f"[SessionStorage] Skipping corrupt conversation event: {e}" + ) + return events + + # ─────────────────────── Trigger Persistence ────────────────────────────── + + # ─────────────────────── Utilities ─────────────────────────────────────── + + def clear_all(self) -> None: + """Wipe all persisted session data.""" + with sqlite3.connect(self._db_path) as conn: + conn.execute("DELETE FROM active_tasks") + conn.execute("DELETE FROM event_records") + conn.execute("DELETE FROM event_streams") + conn.execute("DELETE FROM conversation_history") + conn.commit() + logger.info("[SessionStorage] Cleared all session data") + + def get_stats(self) -> Dict[str, Any]: + """Get storage statistics.""" + with sqlite3.connect(self._db_path) as conn: + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM active_tasks") + task_count = cursor.fetchone()[0] + cursor.execute("SELECT COUNT(*) FROM event_streams") + stream_count = cursor.fetchone()[0] + cursor.execute("SELECT COUNT(*) FROM event_records") + record_count = cursor.fetchone()[0] + cursor.execute("SELECT COUNT(*) FROM conversation_history") + conv_count = cursor.fetchone()[0] + return { + "db_path": self._db_path, + "active_tasks": task_count, + "event_streams": stream_count, + "event_records": record_count, + "conversation_messages": conv_count, + } + + +# Global storage instance +_session_storage: Optional[SessionStorage] = None + + +def get_session_storage() -> SessionStorage: + """Get the global session storage instance.""" + global _session_storage + if _session_storage is None: + _session_storage = SessionStorage() + return _session_storage