Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion agent_core/core/event_stream/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional, List
from typing import Any, Dict, Optional, List


SEVERITIES = ("DEBUG", "INFO", "WARN", "ERROR")
Expand Down Expand Up @@ -64,6 +64,32 @@ def display_text(self) -> Optional[str]:
"""
return self.display_message

def to_dict(self) -> Dict[str, Any]:
"""Serialize the event to a dictionary for persistence."""
return {
"message": self.message,
"kind": self.kind,
"severity": self.severity,
"display_message": self.display_message,
"ts": self.ts.isoformat(),
}

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Event":
"""Deserialize an event from a dictionary."""
ts = (
datetime.fromisoformat(data["ts"])
if isinstance(data.get("ts"), str)
else datetime.now(timezone.utc)
)
return cls(
message=data["message"],
kind=data["kind"],
severity=data["severity"],
display_message=data.get("display_message"),
ts=ts,
)

@property
def iso_ts(self) -> str:
"""
Expand Down Expand Up @@ -92,6 +118,29 @@ class EventRecord:
repeat_count: int = 1
_cached_tokens: int | None = field(default=None, repr=False)

def to_dict(self) -> Dict[str, Any]:
"""Serialize the event record to a dictionary for persistence."""
return {
"event": self.event.to_dict(),
"ts": self.ts.isoformat(),
"repeat_count": self.repeat_count,
}

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "EventRecord":
"""Deserialize an event record from a dictionary."""
event = Event.from_dict(data["event"])
ts = (
datetime.fromisoformat(data["ts"])
if isinstance(data.get("ts"), str)
else datetime.now(timezone.utc)
)
return cls(
event=event,
ts=ts,
repeat_count=data.get("repeat_count", 1),
)

def compact_line(self) -> str:
"""
Generate a compact single-line representation of this event.
Expand Down
10 changes: 8 additions & 2 deletions agent_core/core/impl/event_stream/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
from typing import Callable, Dict, List, Optional
import threading

from agent_core.core.impl.event_stream.event_stream import EventStream
Expand Down Expand Up @@ -64,7 +64,9 @@ class EventStreamManager:
def __init__(
self,
llm: LLMInterfaceProtocol,
agent_file_system_path: Optional[Path] = None
agent_file_system_path: Optional[Path] = None,
on_stream_persist: Optional[Callable[[str, "EventStream"], None]] = None,
on_stream_remove_persist: Optional[Callable[[str], None]] = None,
) -> None:
# Main stream for conversation mode (not task-specific)
self._main_stream: EventStream = EventStream(llm=llm, temp_dir=None)
Expand All @@ -77,6 +79,10 @@ def __init__(
self._skip_unprocessed_logging = False
self._file_lock = threading.Lock()

# Session persistence hooks
self._on_stream_persist = on_stream_persist
self._on_stream_remove_persist = on_stream_remove_persist

# Conversation history for context injection into tasks
# Stores recent user AND agent messages without affecting TUI display
self._conversation_history: List[Event] = []
Expand Down
42 changes: 38 additions & 4 deletions agent_core/core/impl/task/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
OnStreamCreateHook = Callable[[str, Path], None] # (task_id, temp_dir)
OnStreamRemoveHook = Callable[[str], None] # (task_id)

# Session persistence hooks
OnTaskPersistHook = Callable[["Task"], None] # (task)
OnTaskRemovePersistHook = Callable[[str], None] # (task_id)

# Chatserver hooks (WCA only)
OnTaskCreatedChatserverHook = Callable[[Task], None]
OnTodoTransitionHook = Callable[[List[tuple]], None] # List of (todo, old_status, new_status)
Expand Down Expand Up @@ -94,6 +98,9 @@ def __init__(
# Event stream hooks
on_stream_create: Optional[OnStreamCreateHook] = None,
on_stream_remove: Optional[OnStreamRemoveHook] = None,
# Session persistence hooks
on_task_persist: Optional[OnTaskPersistHook] = None,
on_task_remove_persist: Optional[OnTaskRemovePersistHook] = None,
# Chatserver hooks (WCA only)
on_task_created_chatserver: Optional[OnTaskCreatedChatserverHook] = None,
on_todo_transition: Optional[OnTodoTransitionHook] = None,
Expand Down Expand Up @@ -124,6 +131,10 @@ def __init__(
on_stream_create: Called to set up event stream for task.
on_stream_remove: Called to clean up event stream on task end.

Session persistence hooks:
on_task_persist: Called on every task state change to persist task to disk.
on_task_remove_persist: Called when task ends to remove persisted data.

Chatserver hooks (WCA only):
on_task_created_chatserver: POST task to chatserver.
on_todo_transition: Report todo transitions to chatserver.
Expand Down Expand Up @@ -156,6 +167,10 @@ def __init__(
self._on_stream_create = on_stream_create
self._on_stream_remove = on_stream_remove

# Session persistence hooks
self._on_task_persist = on_task_persist
self._on_task_remove_persist = on_task_remove_persist

# Chatserver hooks (WCA only, default to None/no-op)
self._on_task_created_chatserver = on_task_created_chatserver
self._on_todo_transition = on_todo_transition
Expand Down Expand Up @@ -616,6 +631,13 @@ async def _end_task(
if self._current_session_id == task.id:
self._current_session_id = None

# Remove persisted session data (task + event stream)
if self._on_task_remove_persist:
try:
self._on_task_remove_persist(task.id)
except Exception as e:
logger.warning(f"[TaskManager] Failed to remove persisted task {task.id}: {e}")

# Clean up session-specific state (multi-task isolation)
StateSession.end(task.id)

Expand Down Expand Up @@ -658,9 +680,15 @@ async def _end_task(
logger.warning(f"[ONBOARDING] Failed to mark soft onboarding complete: {e}")

def _sync_state_manager(self, task: Optional[Task]) -> None:
"""Sync task state to the state manager."""
"""Sync task state to the state manager and persist to disk."""
if self.state_manager:
self.state_manager.add_to_active_task(task=task)
# Persist task state for crash recovery
if task and self._on_task_persist:
try:
self._on_task_persist(task)
except Exception as e:
logger.warning(f"[TaskManager] Failed to persist task {task.id}: {e}")

def _log_to_task_history(self, task: Task, note: Optional[str] = None) -> None:
"""Log completed task to TASK_HISTORY.md."""
Expand Down Expand Up @@ -729,16 +757,22 @@ def _cleanup_task_temp_dir(self, task: Task) -> None:
except Exception:
logger.warning(f"[TaskManager] Failed to clean temp dir for {task.id}", exc_info=True)

def cleanup_all_temp_dirs(self) -> int:
"""Remove all temporary directories in workspace/tmp/."""
def cleanup_all_temp_dirs(self, exclude: Optional[set] = None) -> int:
"""Remove temporary directories in workspace/tmp/, optionally excluding some.

Args:
exclude: Set of task IDs whose temp directories should be preserved
(e.g., restored tasks that need their workspace).
"""
temp_root = self.workspace_root / "tmp"
if not temp_root.exists():
return 0

exclude = exclude or set()
cleaned_count = 0
try:
for item in temp_root.iterdir():
if item.is_dir():
if item.is_dir() and item.name not in exclude:
try:
shutil.rmtree(item, ignore_errors=True)
cleaned_count += 1
Expand Down
3 changes: 2 additions & 1 deletion agent_core/core/impl/trigger/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,14 +156,15 @@ def create_task_state(self) -> str:

async def clear(self) -> None:
"""
Remove all pending triggers from the queue.
Remove all pending and active triggers from the queue.

The queue is cleared under the protection of the condition variable so
waiting consumers are notified immediately that the queue state has
changed.
"""
async with self._cv:
self._heap.clear()
self._active.clear()
self._cv.notify_all()

# =================================================================
Expand Down
Loading