diff --git a/PolyPilot/Models/AgentSessionInfo.cs b/PolyPilot/Models/AgentSessionInfo.cs index ec348706a1..4b3173756a 100644 --- a/PolyPilot/Models/AgentSessionInfo.cs +++ b/PolyPilot/Models/AgentSessionInfo.cs @@ -9,6 +9,26 @@ public class AgentSessionInfo public bool IsProcessing { get; set; } public List History { get; } = new(); public List MessageQueue { get; } = new(); + + /// + /// Lock for thread-safe access to History. Must be held when mutating History from + /// background threads (SDK events, bridge sync). UI-thread reads can skip the lock + /// when tolerant of stale data, but cross-thread reads (e.g., WsBridgeServer snapshot) + /// must lock. + /// + public readonly object HistoryLock = new(); + + /// + /// Thread-safe snapshot of History. Use this from any non-UI thread that needs + /// to iterate or serialize History without risking concurrent modification. + /// + public ChatMessage[] GetHistorySnapshot() + { + lock (HistoryLock) + { + return History.ToArray(); + } + } public string? WorkingDirectory { get; set; } public string? GitBranch { get; set; } @@ -61,17 +81,9 @@ public int UnreadCount { get { - try - { - // Snapshot to avoid collection-modified exceptions from background threads - var snapshot = History.ToArray(); - return Math.Max(0, - snapshot.Skip(LastReadMessageCount).Count(m => m?.Role == "assistant")); - } - catch - { - return 0; - } + var snapshot = GetHistorySnapshot(); + return Math.Max(0, + snapshot.Skip(LastReadMessageCount).Count(m => m?.Role == "assistant")); } } diff --git a/PolyPilot/Services/CopilotService.Bridge.cs b/PolyPilot/Services/CopilotService.Bridge.cs index a66dccd6c0..6f7925581f 100644 --- a/PolyPilot/Services/CopilotService.Bridge.cs +++ b/PolyPilot/Services/CopilotService.Bridge.cs @@ -93,42 +93,67 @@ private async Task InitializeRemoteAsync(ConnectionSettings settings, Cancellati var session = GetRemoteSession(s); if (session != null) { - var existing = session.History.LastOrDefault(m => m.IsAssistant && !m.IsComplete); - if (existing != null) - existing.Content += c; - else - session.History.Add(new ChatMessage("assistant", c, DateTime.Now, ChatMessageType.Assistant) { IsComplete = false }); + lock (session.HistoryLock) + { + var existing = session.History.LastOrDefault(m => m.IsAssistant && !m.IsComplete); + if (existing != null) + existing.Content += c; + else + { + session.History.Add(new ChatMessage("assistant", c, DateTime.Now, ChatMessageType.Assistant) { IsComplete = false }); + session.MessageCount = session.History.Count; + } + } } InvokeOnUI(() => OnContentReceived?.Invoke(s, c)); }; _bridgeClient.OnToolStarted += (s, tool, id, input) => { var session = GetRemoteSession(s); - session?.History.Add(ChatMessage.ToolCallMessage(tool, id, input)); + if (session != null) + { + lock (session.HistoryLock) + { + session.History.Add(ChatMessage.ToolCallMessage(tool, id, input)); + session.MessageCount = session.History.Count; + } + } InvokeOnUI(() => OnToolStarted?.Invoke(s, tool, id, input)); }; _bridgeClient.OnToolCompleted += (s, id, result, success) => { var session = GetRemoteSession(s); - var toolMsg = session?.History.LastOrDefault(m => m.ToolCallId == id); - if (toolMsg != null) + if (session != null) { - toolMsg.IsComplete = true; - toolMsg.IsSuccess = success; - toolMsg.Content = result; + lock (session.HistoryLock) + { + var toolMsg = session.History.LastOrDefault(m => m.ToolCallId == id); + if (toolMsg != null) + { + toolMsg.IsComplete = true; + toolMsg.IsSuccess = success; + toolMsg.Content = result; + } + } } InvokeOnUI(() => OnToolCompleted?.Invoke(s, id, result, success)); }; _bridgeClient.OnImageReceived += (s, callId, dataUri, caption) => { var session = GetRemoteSession(s); - var toolMsg = session?.History.LastOrDefault(m => m.ToolCallId == callId); - if (toolMsg != null) + if (session != null) { - // Convert tool call message into an Image message - toolMsg.MessageType = ChatMessageType.Image; - toolMsg.ImageDataUri = dataUri; - toolMsg.Caption = caption; + lock (session.HistoryLock) + { + var toolMsg = session.History.LastOrDefault(m => m.ToolCallId == callId); + if (toolMsg != null) + { + // Convert tool call message into an Image message + toolMsg.MessageType = ChatMessageType.Image; + toolMsg.ImageDataUri = dataUri; + toolMsg.Caption = caption; + } + } } InvokeOnUI(() => OnStateChanged?.Invoke()); }; @@ -138,21 +163,24 @@ private async Task InitializeRemoteAsync(ConnectionSettings settings, Cancellati var session = GetRemoteSession(s); if (session != null && !string.IsNullOrEmpty(c)) { - var normalizedReasoningId = ResolveReasoningId(session, rid); - emittedReasoningId = normalizedReasoningId; - var reasoningMsg = FindReasoningMessage(session, normalizedReasoningId); - if (reasoningMsg == null) + lock (session.HistoryLock) { - reasoningMsg = ChatMessage.ReasoningMessage(normalizedReasoningId); - session.History.Add(reasoningMsg); - session.MessageCount = session.History.Count; + var normalizedReasoningId = ResolveReasoningId(session, rid); + emittedReasoningId = normalizedReasoningId; + var reasoningMsg = FindReasoningMessage(session, normalizedReasoningId); + if (reasoningMsg == null) + { + reasoningMsg = ChatMessage.ReasoningMessage(normalizedReasoningId); + session.History.Add(reasoningMsg); + session.MessageCount = session.History.Count; + } + reasoningMsg.ReasoningId = normalizedReasoningId; + reasoningMsg.IsComplete = false; + reasoningMsg.IsCollapsed = false; + reasoningMsg.Timestamp = DateTime.Now; + MergeReasoningContent(reasoningMsg, c, isDelta: true); + session.LastUpdatedAt = DateTime.Now; } - reasoningMsg.ReasoningId = normalizedReasoningId; - reasoningMsg.IsComplete = false; - reasoningMsg.IsCollapsed = false; - reasoningMsg.Timestamp = DateTime.Now; - MergeReasoningContent(reasoningMsg, c, isDelta: true); - session.LastUpdatedAt = DateTime.Now; } InvokeOnUI(() => OnReasoningReceived?.Invoke(s, emittedReasoningId, c)); }; @@ -161,19 +189,22 @@ private async Task InitializeRemoteAsync(ConnectionSettings settings, Cancellati var session = GetRemoteSession(s); if (session != null) { - var targets = session.History - .Where(m => m.MessageType == ChatMessageType.Reasoning && - !m.IsComplete && - (string.IsNullOrEmpty(rid) || string.Equals(m.ReasoningId, rid, StringComparison.Ordinal))) - .ToList(); - foreach (var msg in targets) + lock (session.HistoryLock) { - msg.IsComplete = true; - msg.IsCollapsed = true; - msg.Timestamp = DateTime.Now; + var targets = session.History + .Where(m => m.MessageType == ChatMessageType.Reasoning && + !m.IsComplete && + (string.IsNullOrEmpty(rid) || string.Equals(m.ReasoningId, rid, StringComparison.Ordinal))) + .ToList(); + foreach (var msg in targets) + { + msg.IsComplete = true; + msg.IsCollapsed = true; + msg.Timestamp = DateTime.Now; + } + if (targets.Count > 0) + session.LastUpdatedAt = DateTime.Now; } - if (targets.Count > 0) - session.LastUpdatedAt = DateTime.Now; } InvokeOnUI(() => OnReasoningComplete?.Invoke(s, rid)); }; @@ -202,8 +233,11 @@ private async Task InitializeRemoteAsync(ConnectionSettings settings, Cancellati session.ToolCallCount = 0; session.ProcessingPhase = 0; // Mark last assistant message as complete - var lastAssistant = session.History.LastOrDefault(m => m.IsAssistant && !m.IsComplete); - if (lastAssistant != null) { lastAssistant.IsComplete = true; lastAssistant.Model = session.Model; } + lock (session.HistoryLock) + { + var lastAssistant = session.History.LastOrDefault(m => m.IsAssistant && !m.IsComplete); + if (lastAssistant != null) { lastAssistant.IsComplete = true; lastAssistant.Model = session.Model; } + } } OnTurnEnd?.Invoke(s); }); @@ -480,22 +514,41 @@ private void SyncRemoteSessions() if (messages.Count >= s.Info.History.Count) { - Debug($"SyncRemoteSessions: Syncing {messages.Count} messages for '{name}'"); - s.Info.History.Clear(); - s.Info.History.AddRange(messages); + lock (s.Info.HistoryLock) + { + if (messages.Count >= s.Info.History.Count) + { + Debug($"SyncRemoteSessions: Syncing {messages.Count} messages for '{name}'"); + s.Info.History.Clear(); + s.Info.History.AddRange(messages); + } + } + _requestedHistorySessions.TryRemove(name, out _); } } } - // Request history for sessions that have messages but no local history yet + // Request history for sessions that have messages but no local history yet. + // Also retry sessions where a previous request may have failed (server snapshot error): + // if the session has messages, no local history, no cached history, but was already + // requested, clear the flag so it gets re-requested. foreach (var rs in remoteSessions) { if (rs.MessageCount > 0 && _sessions.TryGetValue(rs.Name, out var s) && s.Info.History.Count == 0 - && !_bridgeClient.SessionHistories.ContainsKey(rs.Name) - && !_requestedHistorySessions.ContainsKey(rs.Name)) + && !_bridgeClient.SessionHistories.ContainsKey(rs.Name)) { - sessionsNeedingHistory.Add(rs.Name); - _requestedHistorySessions[rs.Name] = 0; + if (!_requestedHistorySessions.ContainsKey(rs.Name)) + { + sessionsNeedingHistory.Add(rs.Name); + _requestedHistorySessions[rs.Name] = 0; + } + else if (_requestedHistorySessions.TryGetValue(rs.Name, out var retryCount) && retryCount < 5) + { + // Previous request may have failed — retry up to 5 times + sessionsNeedingHistory.Add(rs.Name); + _requestedHistorySessions[rs.Name] = (byte)(retryCount + 1); + } + // else: retry cap exceeded, stop requesting } } diff --git a/PolyPilot/Services/CopilotService.Events.cs b/PolyPilot/Services/CopilotService.Events.cs index c108f71434..36db4c932c 100644 --- a/PolyPilot/Services/CopilotService.Events.cs +++ b/PolyPilot/Services/CopilotService.Events.cs @@ -148,11 +148,15 @@ private void ApplyReasoningUpdate(SessionState state, string sessionName, string // Register in pending map BEFORE posting to UI thread — this prevents // rapid consecutive deltas from creating duplicates state.PendingReasoningMessages[normalizedReasoningId] = reasoningMsg; - // Must add to History on UI thread to avoid concurrent List mutation + // Must add to History on UI thread; hold HistoryLock to guard against + // concurrent bridge background thread mutations (OnContentReceived, etc.) InvokeOnUI(() => { - state.Info.History.Add(reasoningMsg); - state.Info.MessageCount = state.Info.History.Count; + lock (state.Info.HistoryLock) + { + state.Info.History.Add(reasoningMsg); + state.Info.MessageCount = state.Info.History.Count; + } // Remove from pending — now findable via History search state.PendingReasoningMessages.TryRemove(normalizedReasoningId, out _); }); @@ -179,26 +183,35 @@ private void ApplyReasoningUpdate(SessionState state, string sessionName, string private void CompleteReasoningMessages(SessionState state, string sessionName) { - var openReasoningMessages = state.Info.History - .Where(m => m.MessageType == ChatMessageType.Reasoning && !m.IsComplete) - .ToList(); - if (openReasoningMessages.Count == 0) return; - + List openReasoningMessages; var completedIds = new List(); - foreach (var msg in openReasoningMessages) + // Capture content snapshots for DB writes outside the lock + var dbUpdates = new List<(string sessionId, string reasoningId, string? content)>(); + lock (state.Info.HistoryLock) { - msg.IsComplete = true; - msg.IsCollapsed = true; - msg.Timestamp = DateTime.Now; - if (!string.IsNullOrEmpty(msg.ReasoningId)) + openReasoningMessages = state.Info.History + .Where(m => m.MessageType == ChatMessageType.Reasoning && !m.IsComplete) + .ToList(); + foreach (var msg in openReasoningMessages) { - completedIds.Add(msg.ReasoningId); - if (!string.IsNullOrEmpty(state.Info.SessionId)) - _ = _chatDb.UpdateReasoningContentAsync(state.Info.SessionId, msg.ReasoningId, msg.Content, true); + msg.IsComplete = true; + msg.IsCollapsed = true; + msg.Timestamp = DateTime.Now; + if (!string.IsNullOrEmpty(msg.ReasoningId)) + { + completedIds.Add(msg.ReasoningId); + if (!string.IsNullOrEmpty(state.Info.SessionId)) + dbUpdates.Add((state.Info.SessionId, msg.ReasoningId, msg.Content)); + } } + if (openReasoningMessages.Count > 0) + state.Info.LastUpdatedAt = DateTime.Now; } + if (openReasoningMessages.Count == 0) return; + + foreach (var (sessionId, reasoningId, content) in dbUpdates) + _ = _chatDb.UpdateReasoningContentAsync(sessionId, reasoningId, content ?? "", true); - state.Info.LastUpdatedAt = DateTime.Now; InvokeOnUI(() => { foreach (var reasoningId in completedIds) @@ -308,12 +321,15 @@ void Invoke(Action action) if (!FilteredTools.Contains(startToolName)) { // Deduplicate: SDK replays events on resume/reconnect — update existing - var existingTool = state.Info.History.FirstOrDefault(m => m.ToolCallId == startCallId); - if (existingTool != null) + lock (state.Info.HistoryLock) { - // Update with potentially fresher data - if (!string.IsNullOrEmpty(toolInput)) existingTool.ToolInput = toolInput; - break; + var existingTool = state.Info.History.FirstOrDefault(m => m.ToolCallId == startCallId); + if (existingTool != null) + { + // Update with potentially fresher data + if (!string.IsNullOrEmpty(toolInput)) existingTool.ToolInput = toolInput; + break; + } } // Flush any accumulated assistant text before adding tool message, @@ -325,7 +341,10 @@ void Invoke(Action action) Invoke(() => { FlushCurrentResponse(state); - state.Info.History.Add(imgPlaceholder); + lock (state.Info.HistoryLock) + { + state.Info.History.Add(imgPlaceholder); + } OnToolStarted?.Invoke(sessionName, startToolName, startCallId, toolInput); }); } @@ -335,7 +354,10 @@ void Invoke(Action action) Invoke(() => { FlushCurrentResponse(state); - state.Info.History.Add(toolMsg); + lock (state.Info.HistoryLock) + { + state.Info.History.Add(toolMsg); + } OnToolStarted?.Invoke(sessionName, startToolName, startCallId, toolInput); OnActivity?.Invoke(sessionName, $"🔧 Running {startToolName}..."); }); @@ -363,27 +385,32 @@ void Invoke(Action action) if (resultStr == "Intent logged") break; - // Update the matching tool message in history - var histToolMsg = state.Info.History.LastOrDefault(m => m.ToolCallId == completeCallId); - if (histToolMsg != null) + // Update the matching tool message in history — hold the lock for the full + // lookup + mutation sequence to prevent the UI thread from reading a + // partially-updated message (e.g. IsComplete=true but Content still null). + lock (state.Info.HistoryLock) { - var effectiveToolName = completeToolName ?? histToolMsg.ToolName; - if (effectiveToolName == ShowImageTool.ToolName && !hasError) - { - // Convert tool call placeholder into an Image message - (string? imgPath, string? imgCaption) = ShowImageTool.ParseResult(resultStr); - histToolMsg.MessageType = ChatMessageType.Image; - histToolMsg.ImagePath = imgPath; - histToolMsg.Caption = imgCaption; - histToolMsg.IsComplete = true; - histToolMsg.IsSuccess = true; - histToolMsg.Content = resultStr; - } - else + var histToolMsg = state.Info.History.LastOrDefault(m => m.ToolCallId == completeCallId); + if (histToolMsg != null) { - histToolMsg.IsComplete = true; - histToolMsg.IsSuccess = !hasError; - histToolMsg.Content = resultStr; + var effectiveToolName = completeToolName ?? histToolMsg.ToolName; + if (effectiveToolName == ShowImageTool.ToolName && !hasError) + { + // Convert tool call placeholder into an Image message + (string? imgPath, string? imgCaption) = ShowImageTool.ParseResult(resultStr); + histToolMsg.MessageType = ChatMessageType.Image; + histToolMsg.ImagePath = imgPath; + histToolMsg.Caption = imgCaption; + histToolMsg.IsComplete = true; + histToolMsg.IsSuccess = true; + histToolMsg.Content = resultStr; + } + else + { + histToolMsg.IsComplete = true; + histToolMsg.IsSuccess = !hasError; + histToolMsg.Content = resultStr; + } } } @@ -681,22 +708,33 @@ private void FlushCurrentResponse(SessionState state) // Dedup guard: if this exact text was already flushed (e.g., SDK replayed events // after resume and content was re-appended to CurrentResponse), don't duplicate. - var lastAssistant = state.Info.History.LastOrDefault(m => - m.Role == "assistant" && m.MessageType != ChatMessageType.ToolCall); - if (lastAssistant?.Content == text) + ChatMessage? msg = null; + bool isDuplicate = false; + lock (state.Info.HistoryLock) + { + var lastAssistant = state.Info.History.LastOrDefault(m => + m.Role == "assistant" && m.MessageType != ChatMessageType.ToolCall); + if (lastAssistant?.Content == text) + { + isDuplicate = true; + state.CurrentResponse.Clear(); + state.HasReceivedDeltasThisTurn = false; + } + else + { + msg = new ChatMessage("assistant", text, DateTime.Now) { Model = state.Info.Model }; + state.Info.History.Add(msg); + state.Info.MessageCount = state.Info.History.Count; + } + } + if (isDuplicate) { Debug($"[DEDUP] FlushCurrentResponse skipped duplicate content ({text.Length} chars) for session '{state.Info.Name}'"); - state.CurrentResponse.Clear(); - state.HasReceivedDeltasThisTurn = false; return; } - var msg = new ChatMessage("assistant", text, DateTime.Now) { Model = state.Info.Model }; - state.Info.History.Add(msg); - state.Info.MessageCount = state.Info.History.Count; - if (!string.IsNullOrEmpty(state.Info.SessionId)) - _ = _chatDb.AddMessageAsync(state.Info.SessionId, msg); + _ = _chatDb.AddMessageAsync(state.Info.SessionId, msg!); // Track code suggestions from accumulated response segment _usageStats?.TrackCodeSuggestion(text); @@ -760,18 +798,22 @@ private void CompleteResponse(SessionState state, long? expectedGeneration = nul state.HasUsedToolsThisTurn = false; state.Info.IsResumed = false; // Clear after first successful turn var response = state.CurrentResponse.ToString(); + ChatMessage? completionMsg = null; if (!string.IsNullOrWhiteSpace(response)) { - var msg = new ChatMessage("assistant", response, DateTime.Now) { Model = state.Info.Model }; - state.Info.History.Add(msg); - state.Info.MessageCount = state.Info.History.Count; - // If user is viewing this session, keep it read - if (state.Info.Name == _activeSessionName) - state.Info.LastReadMessageCount = state.Info.History.Count; + completionMsg = new ChatMessage("assistant", response, DateTime.Now) { Model = state.Info.Model }; + lock (state.Info.HistoryLock) + { + state.Info.History.Add(completionMsg); + state.Info.MessageCount = state.Info.History.Count; + // If user is viewing this session, keep it read + if (state.Info.Name == _activeSessionName) + state.Info.LastReadMessageCount = state.Info.History.Count; + } - // Write-through to DB + // Write-through to DB (outside lock — async fire-and-forget) if (!string.IsNullOrEmpty(state.Info.SessionId)) - _ = _chatDb.AddMessageAsync(state.Info.SessionId, msg); + _ = _chatDb.AddMessageAsync(state.Info.SessionId, completionMsg); // Track code suggestions from final response segment _usageStats?.TrackCodeSuggestion(response); diff --git a/PolyPilot/Services/CopilotService.cs b/PolyPilot/Services/CopilotService.cs index 759334ecce..ab17db4901 100644 --- a/PolyPilot/Services/CopilotService.cs +++ b/PolyPilot/Services/CopilotService.cs @@ -1786,7 +1786,10 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis var session = GetRemoteSession(sessionName); if (session != null && !skipHistoryMessage) { - session.History.Add(ChatMessage.UserMessage(prompt)); + lock (session.HistoryLock) + { + session.History.Add(ChatMessage.UserMessage(prompt)); + } } if (session != null) session.IsProcessing = true; @@ -2029,8 +2032,11 @@ public async Task AbortSessionAsync(string sessionName) if (!string.IsNullOrEmpty(partialResponse)) { var msg = new ChatMessage("assistant", partialResponse, DateTime.Now) { Model = state.Info.Model }; - state.Info.History.Add(msg); - state.Info.MessageCount = state.Info.History.Count; + lock (state.Info.HistoryLock) + { + state.Info.History.Add(msg); + state.Info.MessageCount = state.Info.History.Count; + } if (!string.IsNullOrEmpty(state.Info.SessionId)) _ = _chatDb.AddMessageAsync(state.Info.SessionId, msg); } @@ -2482,8 +2488,11 @@ public void ClearHistory(string name) { if (_sessions.TryGetValue(name, out var state)) { - state.Info.History.Clear(); - state.Info.MessageCount = 0; + lock (state.Info.HistoryLock) + { + state.Info.History.Clear(); + state.Info.MessageCount = 0; + } OnStateChanged?.Invoke(); } } diff --git a/PolyPilot/Services/WsBridgeServer.cs b/PolyPilot/Services/WsBridgeServer.cs index f2a774a876..cef71fa955 100644 --- a/PolyPilot/Services/WsBridgeServer.cs +++ b/PolyPilot/Services/WsBridgeServer.cs @@ -880,6 +880,12 @@ private async Task SendPersistedToClient(string clientId, WebSocket ws, Cancella await SendToClientAsync(clientId, ws, msg, ct); } + /// + /// Server-side maximum for history payloads. Even if the client requests unlimited history, + /// cap to this to prevent massive WebSocket payloads from blocking the channel. + /// + private const int MaxHistoryPayloadMessages = 500; + private async Task SendSessionHistoryToClient(string clientId, WebSocket ws, string sessionName, int? limit, CancellationToken ct) { if (_copilot == null) return; @@ -887,28 +893,23 @@ private async Task SendSessionHistoryToClient(string clientId, WebSocket ws, str var session = _copilot.GetSession(sessionName); if (session == null) return; - // Take a defensive snapshot — History is a plain List that may be + // Thread-safe snapshot via lock — History is a plain List that may be // modified concurrently by SDK event handlers on background threads. - // ToArray() uses Array.Copy internally so it won't throw InvalidOperationException, - // but can hit ArgumentOutOfRangeException if the list resizes during copy. - // On failure, skip sending entirely — never send an empty authoritative payload - // (that would make the client think the session has no history with no recovery path). - ChatMessage[] snapshot; - try { snapshot = session.History.ToArray(); } - catch (Exception ex) - { - Console.WriteLine($"[WsBridge] History snapshot failed for '{sessionName}': {ex.Message}"); - return; - } + var snapshot = session.GetHistorySnapshot(); var totalCount = snapshot.Length; + + // Apply server-side cap, then client-requested limit + var effectiveLimit = limit.HasValue + ? Math.Min(limit.Value, MaxHistoryPayloadMessages) + : MaxHistoryPayloadMessages; // Apply limit — take the most recent N messages List messagesToSend; bool hasMore; - if (limit.HasValue && limit.Value < totalCount) + if (effectiveLimit < totalCount) { - messagesToSend = snapshot.Skip(totalCount - limit.Value).ToList(); + messagesToSend = snapshot.Skip(totalCount - effectiveLimit).ToList(); hasMore = true; } else @@ -952,7 +953,7 @@ private SessionsListPayload BuildSessionsListPayload() Name = s.Name, Model = s.Model, CreatedAt = s.CreatedAt, - MessageCount = s.History.Count, + MessageCount = s.MessageCount, IsProcessing = s.IsProcessing, SessionId = s.SessionId, WorkingDirectory = s.WorkingDirectory,