Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions components/backend/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func registerRoutes(r *gin.Engine) {
projectGroup.GET("/agentic-sessions/:sessionName/agui/history", websocket.HandleAGUIHistory)
projectGroup.GET("/agentic-sessions/:sessionName/agui/runs", websocket.HandleAGUIRuns)

// Runner capabilities endpoint
projectGroup.GET("/agentic-sessions/:sessionName/agui/capabilities", websocket.HandleCapabilities)

// MCP status endpoint
projectGroup.GET("/agentic-sessions/:sessionName/mcp/status", websocket.HandleMCPStatus)

Expand Down
5 changes: 4 additions & 1 deletion components/backend/types/agui.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (

// State management events
EventTypeStateSnapshot = "STATE_SNAPSHOT"
EventTypStateDelta = "STATE_DELTA"
EventTypeStateDelta = "STATE_DELTA"

// Message snapshot for restore/reconnect
EventTypeMessagesSnapshot = "MESSAGES_SNAPSHOT"
Expand All @@ -56,6 +56,9 @@ const (
// Raw event for pass-through
EventTypeRaw = "RAW"

// Custom event for platform extensions
EventTypeCustom = "CUSTOM"

// META event for user feedback (thumbs up/down)
// See: https://docs.ag-ui.com/drafts/meta-events
EventTypeMeta = "META"
Expand Down
260 changes: 151 additions & 109 deletions components/backend/websocket/agui.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ func RouteAGUIEvent(sessionID string, event map[string]interface{}) {
// Persist the event (use runID from event, not activeRunState)
go persistAGUIEventMap(sessionID, runID, event)

// If this is a MESSAGES_SNAPSHOT, also persist as the latest snapshot
// for fast reconnect (avoids replaying the full event log)
if eventType == types.EventTypeMessagesSnapshot {
go persistMessagesSnapshot(sessionID, event)
}

// Check for terminal events - mark run as complete
if isTerminalEventType(eventType) {
activeRunState.Status = getTerminalStatusFromType(eventType)
Expand All @@ -207,10 +213,113 @@ func RouteAGUIEvent(sessionID string, event map[string]interface{}) {
}
}

// loadCompactedMessages loads pre-compacted messages from completed runs
// NOTE: Removed loadCompactedMessages and compactAndPersistRun functions.
// We now use "compact-on-read" strategy in streamThreadEvents.
// This eliminates race conditions, dual-file complexity, and async compaction issues.
// persistMessagesSnapshot writes the latest MESSAGES_SNAPSHOT to a separate
// file for fast reconnect. On reconnect the backend can read this file
// directly instead of replaying all events through the compactor.
func persistMessagesSnapshot(sessionID string, event map[string]interface{}) {
path := fmt.Sprintf("%s/sessions/%s/messages-snapshot.json", StateBaseDir, sessionID)
_ = ensureDir(fmt.Sprintf("%s/sessions/%s", StateBaseDir, sessionID))

data, err := json.Marshal(event)
if err != nil {
log.Printf("AGUI: failed to marshal MESSAGES_SNAPSHOT for persistence: %v", err)
return
}

if err := os.WriteFile(path, data, 0644); err != nil {
log.Printf("AGUI: failed to write MESSAGES_SNAPSHOT: %v", err)
return
}
log.Printf("AGUI: Persisted MESSAGES_SNAPSHOT for session %s (%d bytes)", sessionID, len(data))
}

// loadPersistedSnapshot reads the latest MESSAGES_SNAPSHOT from disk.
// Returns nil if no snapshot exists.
func loadPersistedSnapshot(sessionID string) map[string]interface{} {
path := fmt.Sprintf("%s/sessions/%s/messages-snapshot.json", StateBaseDir, sessionID)
data, err := os.ReadFile(path)
if err != nil {
return nil // No snapshot yet
}

var event map[string]interface{}
if err := json.Unmarshal(data, &event); err != nil {
log.Printf("AGUI: failed to unmarshal persisted MESSAGES_SNAPSHOT: %v", err)
return nil
}
return event
}

// messagesFromSnapshot extracts typed Message objects from a raw
// MESSAGES_SNAPSHOT event (as persisted on disk or received from the runner).
// Returns nil if the event is nil or has no messages.
func messagesFromSnapshot(snapshot map[string]interface{}) []types.Message {
if snapshot == nil {
return nil
}
msgs, ok := snapshot["messages"].([]interface{})
if !ok || len(msgs) == 0 {
return nil
}

result := make([]types.Message, 0, len(msgs))
for _, m := range msgs {
msgMap, ok := m.(map[string]interface{})
if !ok {
continue
}
msg := types.Message{}
if id, ok := msgMap["id"].(string); ok {
msg.ID = id
}
if role, ok := msgMap["role"].(string); ok {
msg.Role = role
}
if content, ok := msgMap["content"].(string); ok {
msg.Content = content
}
if ts, ok := msgMap["timestamp"].(string); ok {
msg.Timestamp = ts
}
if toolCalls, ok := msgMap["toolCalls"].([]interface{}); ok {
msg.ToolCalls = make([]types.ToolCall, 0, len(toolCalls))
for _, tc := range toolCalls {
tcMap, ok := tc.(map[string]interface{})
if !ok {
continue
}
toolCall := types.ToolCall{}
if id, ok := tcMap["id"].(string); ok {
toolCall.ID = id
}
if name, ok := tcMap["name"].(string); ok {
toolCall.Name = name
}
if args, ok := tcMap["args"].(string); ok {
toolCall.Args = args
}
if t, ok := tcMap["type"].(string); ok {
toolCall.Type = t
}
if pid, ok := tcMap["parentToolUseId"].(string); ok {
toolCall.ParentToolUseID = pid
}
if r, ok := tcMap["result"].(string); ok {
toolCall.Result = r
}
if s, ok := tcMap["status"].(string); ok {
toolCall.Status = s
}
if e, ok := tcMap["error"].(string); ok {
toolCall.Error = e
}
msg.ToolCalls = append(msg.ToolCalls, toolCall)
}
}
result = append(result, msg)
}
return result
}

// persistAGUIEventMap persists a map[string]interface{} event to disk
func persistAGUIEventMap(sessionID, runID string, event map[string]interface{}) {
Expand Down Expand Up @@ -330,114 +439,60 @@ func streamThreadEvents(c *gin.Context, projectName, sessionName string) {
close(eventCh)
}()

// OPTION 1: Compact-on-Read Strategy (COMPLETED RUNS ONLY)
// Load events from agui-events.jsonl and compact only COMPLETED runs
// Active/in-progress runs will be streamed raw

// Declare outside so it's accessible later for replaying active runs
// Use persisted MESSAGES_SNAPSHOT for fast reconnect.
// The runner emits MESSAGES_SNAPSHOT at the end of each run with
// the complete conversation history — no compaction needed.
//
// Declare activeRunIDs outside so it's accessible for replaying active runs.
activeRunIDs := make(map[string]bool)

// Determine which runs are active from in-memory state + event log
events, err := loadEventsForRun(sessionName, "")
if err == nil && len(events) > 0 {

// CRITICAL FIX: Determine which runs are TRULY active by checking event log
// A run is only active if NO terminal event exists in the log
runHasTerminalEvent := make(map[string]bool)
for _, event := range events {
eventRunID, ok := event["runId"].(string)
if !ok {
continue
}
eventType, ok := event["type"].(string)
if !ok {
continue
}

eventRunID, _ := event["runId"].(string)
eventType, _ := event["type"].(string)
if eventRunID != "" && isTerminalEventType(eventType) {
runHasTerminalEvent[eventRunID] = true
}
}

// Check in-memory state and override with event log truth
// Also fix stale in-memory state
aguiRunsMu.Lock()
for _, state := range aguiRuns {
if state.SessionID == sessionName {
runID := state.RunID
// Only consider active if NO terminal event in log
if !runHasTerminalEvent[runID] {
activeRunIDs[runID] = true
} else {
// Fix stale memory state
if state.Status == "running" {
state.Status = "completed"
}
} else if state.Status == "running" {
state.Status = "completed"
}
}
}
aguiRunsMu.Unlock()
} else if err != nil {
log.Printf("AGUI: Failed to load events: %v", err)
}

// Filter to only events from COMPLETED runs (have terminal event)
// Also collect session-level META events (feedback, etc.) which may not have runId
completedEvents := make([]map[string]interface{}, 0)
sessionMetaEvents := make([]map[string]interface{}, 0)
skippedCount := 0
for _, event := range events {
eventType, _ := event["type"].(string)
eventRunID, ok := event["runId"].(string)

// META events may not have runId (session-level feedback) - collect them separately
if eventType == types.EventTypeMeta {
sessionMetaEvents = append(sessionMetaEvents, event)
continue
}

if !ok {
continue
}

// Skip events without runId
if eventRunID == "" {
skippedCount++
continue
}

// Skip events from active runs (no terminal event yet)
if activeRunIDs[eventRunID] {
skippedCount++
continue
}

// Include events from completed runs
completedEvents = append(completedEvents, event)
}

if len(completedEvents) > 0 {
// Compact only completed run events
messages := CompactEvents(completedEvents)

// Send single MESSAGES_SNAPSHOT with compacted messages from COMPLETED runs
if len(messages) > 0 {
snapshot := &types.MessagesSnapshotEvent{
BaseEvent: types.NewBaseEvent(types.EventTypeMessagesSnapshot, threadID, "thread-snapshot"),
Messages: messages,
}
writeSSEEvent(c.Writer, snapshot)
c.Writer.(http.Flusher).Flush()
}
// Send persisted MESSAGES_SNAPSHOT for completed conversation history
persistedSnapshot := loadPersistedSnapshot(sessionName)
if messages := messagesFromSnapshot(persistedSnapshot); len(messages) > 0 {
snapshot := &types.MessagesSnapshotEvent{
BaseEvent: types.NewBaseEvent(types.EventTypeMessagesSnapshot, threadID, "thread-snapshot"),
Messages: messages,
}
writeSSEEvent(c.Writer, snapshot)
c.Writer.(http.Flusher).Flush()
}

// Replay ALL session META events (feedback, tags, annotations)
// META events are session-level and not part of MESSAGES_SNAPSHOT
// They must be replayed regardless of runId to survive reconnects
if len(sessionMetaEvents) > 0 {
for _, event := range sessionMetaEvents {
// Replay session META events (feedback, annotations) — not in MESSAGES_SNAPSHOT
if events != nil {
for _, event := range events {
if eventType, _ := event["type"].(string); eventType == types.EventTypeMeta {
writeSSEEvent(c.Writer, event)
}
c.Writer.(http.Flusher).Flush()
}
} else if err != nil {
log.Printf("AGUI: Failed to load events: %v", err)
c.Writer.(http.Flusher).Flush()
}

// Replay ALL active runs (not just most recent)
Expand Down Expand Up @@ -648,23 +703,15 @@ func sendInitialSyncEvents(c *gin.Context, runState *AGUIRunState, projectName,
// 2. Send basic state snapshot (always succeeds)
sendBasicStateSnapshot(c, runState, projectName, sessionName)

// 3. Compact stored events and send MESSAGES_SNAPSHOT
// Per AG-UI spec: compact at read-time, not write-time
events, err := loadEventsForRun(sessionName, runID)
if err != nil {
log.Printf("AGUI: Failed to load events for %s: %v", sessionName, err)
}

if len(events) > 0 {
messages := CompactEvents(events)

if len(messages) > 0 {
snapshot := &types.MessagesSnapshotEvent{
BaseEvent: types.NewBaseEvent(types.EventTypeMessagesSnapshot, threadID, runID),
Messages: messages,
}
writeSSEEvent(c.Writer, snapshot)
// 3. Send persisted MESSAGES_SNAPSHOT for conversation history
// The runner emits MESSAGES_SNAPSHOT at the end of each run — use it directly.
persistedSnapshot := loadPersistedSnapshot(sessionName)
if messages := messagesFromSnapshot(persistedSnapshot); len(messages) > 0 {
snapshot := &types.MessagesSnapshotEvent{
BaseEvent: types.NewBaseEvent(types.EventTypeMessagesSnapshot, threadID, runID),
Messages: messages,
}
writeSSEEvent(c.Writer, snapshot)
}
}

Expand Down Expand Up @@ -964,14 +1011,9 @@ func HandleAGUIHistory(c *gin.Context) {
}
runID := c.Query("runId")

// Compact events to messages
var messages []types.Message
if runID != "" {
events, err := loadEventsForRun(sessionName, runID)
if err == nil {
messages = CompactEvents(events)
}
}
// Load messages from persisted MESSAGES_SNAPSHOT (no compaction needed)
persistedSnapshot := loadPersistedSnapshot(sessionName)
messages := messagesFromSnapshot(persistedSnapshot)

// Get runs for this session
runs := getRunsForSession(sessionName)
Expand Down
Loading
Loading