Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions PolyPilot/Models/AgentSessionInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,26 @@ public class AgentSessionInfo
public bool IsProcessing { get; set; }
public List<ChatMessage> History { get; } = new();
public List<string> MessageQueue { get; } = new();

/// <summary>
/// Lock for thread-safe access to History. Must be held when mutating History from
/// background threads (SDK events, bridge sync). UI-thread reads can skip the lock
/// when tolerant of stale data, but cross-thread reads (e.g., WsBridgeServer snapshot)
/// must lock.
/// </summary>
public readonly object HistoryLock = new();

/// <summary>
/// Thread-safe snapshot of History. Use this from any non-UI thread that needs
/// to iterate or serialize History without risking concurrent modification.
/// </summary>
public ChatMessage[] GetHistorySnapshot()
{
lock (HistoryLock)
{
return History.ToArray();
}
}

public string? WorkingDirectory { get; set; }
public string? GitBranch { get; set; }
Expand Down Expand Up @@ -61,17 +81,9 @@ public int UnreadCount
{
get
{
try
{
// Snapshot to avoid collection-modified exceptions from background threads
var snapshot = History.ToArray();
return Math.Max(0,
snapshot.Skip(LastReadMessageCount).Count(m => m?.Role == "assistant"));
}
catch
{
return 0;
}
var snapshot = GetHistorySnapshot();
return Math.Max(0,
snapshot.Skip(LastReadMessageCount).Count(m => m?.Role == "assistant"));
}
}

Expand Down
155 changes: 104 additions & 51 deletions PolyPilot/Services/CopilotService.Bridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,42 +93,67 @@ private async Task InitializeRemoteAsync(ConnectionSettings settings, Cancellati
var session = GetRemoteSession(s);
if (session != null)
{
var existing = session.History.LastOrDefault(m => m.IsAssistant && !m.IsComplete);
if (existing != null)
existing.Content += c;
else
session.History.Add(new ChatMessage("assistant", c, DateTime.Now, ChatMessageType.Assistant) { IsComplete = false });
lock (session.HistoryLock)
{
var existing = session.History.LastOrDefault(m => m.IsAssistant && !m.IsComplete);
if (existing != null)
existing.Content += c;
else
{
session.History.Add(new ChatMessage("assistant", c, DateTime.Now, ChatMessageType.Assistant) { IsComplete = false });
session.MessageCount = session.History.Count;
}
}
}
InvokeOnUI(() => OnContentReceived?.Invoke(s, c));
};
_bridgeClient.OnToolStarted += (s, tool, id, input) =>
{
var session = GetRemoteSession(s);
session?.History.Add(ChatMessage.ToolCallMessage(tool, id, input));
if (session != null)
{
lock (session.HistoryLock)
{
session.History.Add(ChatMessage.ToolCallMessage(tool, id, input));
session.MessageCount = session.History.Count;
}
}
InvokeOnUI(() => OnToolStarted?.Invoke(s, tool, id, input));
};
_bridgeClient.OnToolCompleted += (s, id, result, success) =>
{
var session = GetRemoteSession(s);
var toolMsg = session?.History.LastOrDefault(m => m.ToolCallId == id);
if (toolMsg != null)
if (session != null)
{
toolMsg.IsComplete = true;
toolMsg.IsSuccess = success;
toolMsg.Content = result;
lock (session.HistoryLock)
{
var toolMsg = session.History.LastOrDefault(m => m.ToolCallId == id);
if (toolMsg != null)
{
toolMsg.IsComplete = true;
toolMsg.IsSuccess = success;
toolMsg.Content = result;
}
}
}
InvokeOnUI(() => OnToolCompleted?.Invoke(s, id, result, success));
};
_bridgeClient.OnImageReceived += (s, callId, dataUri, caption) =>
{
var session = GetRemoteSession(s);
var toolMsg = session?.History.LastOrDefault(m => m.ToolCallId == callId);
if (toolMsg != null)
if (session != null)
{
// Convert tool call message into an Image message
toolMsg.MessageType = ChatMessageType.Image;
toolMsg.ImageDataUri = dataUri;
toolMsg.Caption = caption;
lock (session.HistoryLock)
{
var toolMsg = session.History.LastOrDefault(m => m.ToolCallId == callId);
if (toolMsg != null)
{
// Convert tool call message into an Image message
toolMsg.MessageType = ChatMessageType.Image;
toolMsg.ImageDataUri = dataUri;
toolMsg.Caption = caption;
}
}
}
InvokeOnUI(() => OnStateChanged?.Invoke());
};
Expand All @@ -138,21 +163,24 @@ private async Task InitializeRemoteAsync(ConnectionSettings settings, Cancellati
var session = GetRemoteSession(s);
if (session != null && !string.IsNullOrEmpty(c))
{
var normalizedReasoningId = ResolveReasoningId(session, rid);
emittedReasoningId = normalizedReasoningId;
var reasoningMsg = FindReasoningMessage(session, normalizedReasoningId);
if (reasoningMsg == null)
lock (session.HistoryLock)
{
reasoningMsg = ChatMessage.ReasoningMessage(normalizedReasoningId);
session.History.Add(reasoningMsg);
session.MessageCount = session.History.Count;
var normalizedReasoningId = ResolveReasoningId(session, rid);
emittedReasoningId = normalizedReasoningId;
var reasoningMsg = FindReasoningMessage(session, normalizedReasoningId);
if (reasoningMsg == null)
{
reasoningMsg = ChatMessage.ReasoningMessage(normalizedReasoningId);
session.History.Add(reasoningMsg);
session.MessageCount = session.History.Count;
}
reasoningMsg.ReasoningId = normalizedReasoningId;
reasoningMsg.IsComplete = false;
reasoningMsg.IsCollapsed = false;
reasoningMsg.Timestamp = DateTime.Now;
MergeReasoningContent(reasoningMsg, c, isDelta: true);
session.LastUpdatedAt = DateTime.Now;
}
reasoningMsg.ReasoningId = normalizedReasoningId;
reasoningMsg.IsComplete = false;
reasoningMsg.IsCollapsed = false;
reasoningMsg.Timestamp = DateTime.Now;
MergeReasoningContent(reasoningMsg, c, isDelta: true);
session.LastUpdatedAt = DateTime.Now;
}
InvokeOnUI(() => OnReasoningReceived?.Invoke(s, emittedReasoningId, c));
};
Expand All @@ -161,19 +189,22 @@ private async Task InitializeRemoteAsync(ConnectionSettings settings, Cancellati
var session = GetRemoteSession(s);
if (session != null)
{
var targets = session.History
.Where(m => m.MessageType == ChatMessageType.Reasoning &&
!m.IsComplete &&
(string.IsNullOrEmpty(rid) || string.Equals(m.ReasoningId, rid, StringComparison.Ordinal)))
.ToList();
foreach (var msg in targets)
lock (session.HistoryLock)
{
msg.IsComplete = true;
msg.IsCollapsed = true;
msg.Timestamp = DateTime.Now;
var targets = session.History
.Where(m => m.MessageType == ChatMessageType.Reasoning &&
!m.IsComplete &&
(string.IsNullOrEmpty(rid) || string.Equals(m.ReasoningId, rid, StringComparison.Ordinal)))
.ToList();
foreach (var msg in targets)
{
msg.IsComplete = true;
msg.IsCollapsed = true;
msg.Timestamp = DateTime.Now;
}
if (targets.Count > 0)
session.LastUpdatedAt = DateTime.Now;
}
if (targets.Count > 0)
session.LastUpdatedAt = DateTime.Now;
}
InvokeOnUI(() => OnReasoningComplete?.Invoke(s, rid));
};
Expand Down Expand Up @@ -202,8 +233,11 @@ private async Task InitializeRemoteAsync(ConnectionSettings settings, Cancellati
session.ToolCallCount = 0;
session.ProcessingPhase = 0;
// Mark last assistant message as complete
var lastAssistant = session.History.LastOrDefault(m => m.IsAssistant && !m.IsComplete);
if (lastAssistant != null) { lastAssistant.IsComplete = true; lastAssistant.Model = session.Model; }
lock (session.HistoryLock)
{
var lastAssistant = session.History.LastOrDefault(m => m.IsAssistant && !m.IsComplete);
if (lastAssistant != null) { lastAssistant.IsComplete = true; lastAssistant.Model = session.Model; }
}
}
OnTurnEnd?.Invoke(s);
});
Expand Down Expand Up @@ -480,22 +514,41 @@ private void SyncRemoteSessions()

if (messages.Count >= s.Info.History.Count)
{
Debug($"SyncRemoteSessions: Syncing {messages.Count} messages for '{name}'");
s.Info.History.Clear();
s.Info.History.AddRange(messages);
lock (s.Info.HistoryLock)
{
if (messages.Count >= s.Info.History.Count)
{
Debug($"SyncRemoteSessions: Syncing {messages.Count} messages for '{name}'");
s.Info.History.Clear();
s.Info.History.AddRange(messages);
}
}
_requestedHistorySessions.TryRemove(name, out _);
}
}
}

// Request history for sessions that have messages but no local history yet
// Request history for sessions that have messages but no local history yet.
// Also retry sessions where a previous request may have failed (server snapshot error):
// if the session has messages, no local history, no cached history, but was already
// requested, clear the flag so it gets re-requested.
foreach (var rs in remoteSessions)
{
if (rs.MessageCount > 0 && _sessions.TryGetValue(rs.Name, out var s) && s.Info.History.Count == 0
&& !_bridgeClient.SessionHistories.ContainsKey(rs.Name)
&& !_requestedHistorySessions.ContainsKey(rs.Name))
&& !_bridgeClient.SessionHistories.ContainsKey(rs.Name))
{
sessionsNeedingHistory.Add(rs.Name);
_requestedHistorySessions[rs.Name] = 0;
if (!_requestedHistorySessions.ContainsKey(rs.Name))
{
sessionsNeedingHistory.Add(rs.Name);
_requestedHistorySessions[rs.Name] = 0;
}
else if (_requestedHistorySessions.TryGetValue(rs.Name, out var retryCount) && retryCount < 5)
{
// Previous request may have failed — retry up to 5 times
sessionsNeedingHistory.Add(rs.Name);
_requestedHistorySessions[rs.Name] = (byte)(retryCount + 1);
}
// else: retry cap exceeded, stop requesting
}
}

Expand Down
Loading