Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions PolyPilot.Tests/BridgeDisconnectTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -265,4 +265,87 @@ public async Task ForceSync_WhenNotStreaming_AlwaysAppliesServerHistory()
Assert.Equal(2, session.History.Count);
Assert.Equal(2, session.MessageCount);
}

// ===== Stale IsProcessing guard tests =====

[Fact]
public async Task SyncRemoteSessions_DoesNotResetIsProcessing_AfterTurnEnd()
{
// Scenario: TurnEnd clears IsProcessing=false on mobile, then a stale
// sessions_list arrives (debounced on server) with IsProcessing=true.
// The TurnEnd guard should prevent re-setting to true.
var svc = CreateRemoteService();
await AddRemoteSession(svc, "orchestrator");
var session = svc.GetSession("orchestrator")!;

// Server starts processing — sessions_list sets IsProcessing=true
_bridgeClient.Sessions = new() { new SessionSummary { Name = "orchestrator", IsProcessing = true } };
svc.SyncRemoteSessions();
Assert.True(session.IsProcessing);

// TurnEnd arrives — clears IsProcessing and sets the guard
session.IsProcessing = false;
svc.SetTurnEndGuardForTesting("orchestrator", true);

// Stale sessions_list snapshot arrives with IsProcessing=true
_bridgeClient.Sessions = new() { new SessionSummary { Name = "orchestrator", IsProcessing = true } };
svc.SyncRemoteSessions();

// Guard should prevent overwrite — still false
Assert.False(session.IsProcessing);
}

[Fact]
public async Task SyncRemoteSessions_AllowsIsProcessingTrue_OnInitialSync()
{
// Scenario: Fresh connection — no prior TurnEnd. sessions_list with
// IsProcessing=true should be accepted (no guard entry exists).
var svc = CreateRemoteService();
_bridgeClient.Sessions = new() { new SessionSummary { Name = "new-session", IsProcessing = true } };
svc.SyncRemoteSessions();

var session = svc.GetSession("new-session");
Assert.NotNull(session);
Assert.True(session!.IsProcessing);
}

[Fact]
public async Task TurnStart_ClearsGuard_AllowsSessionsListToSetProcessing()
{
// Scenario: After TurnEnd guard is set, a new TurnStart clears it.
// sessions_list should then be able to set IsProcessing=true.
var svc = CreateRemoteService();
await AddRemoteSession(svc, "session1");
var session = svc.GetSession("session1")!;

// Turn completes — guard is set
session.IsProcessing = false;
svc.SetTurnEndGuardForTesting("session1", true);

// New turn starts — clear the guard (simulating TurnStart handler)
svc.SetTurnEndGuardForTesting("session1", false);
session.IsProcessing = true; // TurnStart sets this

// sessions_list confirms processing — guard is gone, should succeed
_bridgeClient.Sessions = new() { new SessionSummary { Name = "session1", IsProcessing = true } };
svc.SyncRemoteSessions();
Assert.True(session.IsProcessing);
}

[Fact]
public async Task SyncRemoteSessions_AllowsSessionsListToClearProcessing()
{
// Scenario: Server says session is done (IsProcessing=false).
// SyncRemoteSessions should always accept false from the server.
var svc = CreateRemoteService();
await AddRemoteSession(svc, "session1");
var session = svc.GetSession("session1")!;

// Session is processing
session.IsProcessing = true;
_bridgeClient.Sessions = new() { new SessionSummary { Name = "session1", IsProcessing = false } };
svc.SyncRemoteSessions();

Assert.False(session.IsProcessing);
}
}
27 changes: 27 additions & 0 deletions PolyPilot.Tests/RenderThrottleTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,33 @@ public void CustomThrottleInterval_Respected()
Assert.True(throttle.ShouldRefresh(isSessionSwitch: false, hasCompletedSessions: false));
}

[Fact]
public void Streaming_BypassesThrottle()
{
var throttle = new RenderThrottle(500);
// First call goes through normally
Assert.True(throttle.ShouldRefresh(isSessionSwitch: false, hasCompletedSessions: false));

// Immediately after, normal refresh is throttled
Assert.False(throttle.ShouldRefresh(isSessionSwitch: false, hasCompletedSessions: false));

// But streaming content always gets through
Assert.True(throttle.ShouldRefresh(isSessionSwitch: false, hasCompletedSessions: false, isStreaming: true));
}

[Fact]
public void Streaming_UpdatesLastRefreshTime()
{
var throttle = new RenderThrottle(500);
throttle.SetLastRefresh(DateTime.UtcNow.AddSeconds(-10));

// Streaming bypass updates the timestamp
Assert.True(throttle.ShouldRefresh(isSessionSwitch: false, hasCompletedSessions: false, isStreaming: true));

// So a normal refresh immediately after is still throttled
Assert.False(throttle.ShouldRefresh(isSessionSwitch: false, hasCompletedSessions: false));
}

[Fact]
public void CompletionRace_OnStateChangedThrottledButHandleCompleteRenders()
{
Expand Down
6 changes: 6 additions & 0 deletions PolyPilot.Tests/TestStubs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ public Task<WorktreeCreatedPayload> CreateWorktreeAsync(string repoId, string? b

public Task<FetchImageResponsePayload> FetchImageAsync(string path, CancellationToken ct = default)
=> Task.FromResult(new FetchImageResponsePayload { Error = "Stub" });

// Test helpers for firing events
public void FireTurnStart(string sessionName) => OnTurnStart?.Invoke(sessionName);
public void FireTurnEnd(string sessionName) => OnTurnEnd?.Invoke(sessionName);
public void FireSessionComplete(string sessionName, string summary = "") => OnSessionComplete?.Invoke(sessionName, summary);
public void FireStateChanged() => OnStateChanged?.Invoke();
}

internal class StubDemoService : IDemoService
Expand Down
9 changes: 7 additions & 2 deletions PolyPilot/Components/Pages/Dashboard.razor
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,7 @@

private DateTime _lastRefresh = DateTime.MinValue;
private volatile bool _renderDirty;
private volatile bool _contentDirty; // Set when content_delta arrives, cleared after render
private Timer? _renderTimer;
private Timer? _heartbeatTimer;
private readonly ConcurrentDictionary<string, (int MessageCount, bool IsProcessing, int ToolCallCount, int ProcessingPhase, DateTime LastUpdatedAt)> _lastRenderedSnapshot = new();
Expand Down Expand Up @@ -1183,8 +1184,11 @@
var active = CopilotService.ActiveSessionName;
bool sessionSwitched = active != _lastActiveSession;

// Throttle non-switch state changes to max 2/sec, but always allow completed sessions through
if (!_refreshThrottle.ShouldRefresh(sessionSwitched, completedSessions.Count > 0))
// Throttle non-switch state changes to max 2/sec, but always allow completed sessions
// and active streaming content through
var isStreaming = _contentDirty;
if (isStreaming) _contentDirty = false;
if (!_refreshThrottle.ShouldRefresh(sessionSwitched, completedSessions.Count > 0, isStreaming))
return;

sessions = CopilotService.GetAllSessions().ToList();
Expand Down Expand Up @@ -1294,6 +1298,7 @@
if (!streamingBySession.ContainsKey(sessionName))
streamingBySession[sessionName] = "";
streamingBySession[sessionName] += content;
_contentDirty = true;
if (sessionName == expandedSession || expandedSession == null)
ScheduleRender();
}
Expand Down
13 changes: 10 additions & 3 deletions PolyPilot/Models/RenderThrottle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ public RenderThrottle(int throttleMs = 500)

/// <summary>
/// Returns true if the refresh should proceed, false if throttled.
/// A refresh always proceeds when hasCompletedSessions is true (turn just finished)
/// or when isSessionSwitch is true.
/// A refresh always proceeds when hasCompletedSessions is true (turn just finished),
/// when isSessionSwitch is true, or when isStreaming is true (content is actively arriving).
/// </summary>
public bool ShouldRefresh(bool isSessionSwitch, bool hasCompletedSessions)
public bool ShouldRefresh(bool isSessionSwitch, bool hasCompletedSessions, bool isStreaming = false)
{
if (isSessionSwitch)
return true;
Expand All @@ -33,6 +33,13 @@ public bool ShouldRefresh(bool isSessionSwitch, bool hasCompletedSessions)
return true;
}

// Always allow during active streaming — content deltas must render promptly
if (isStreaming)
{
_lastRefresh = now;
return true;
}

if ((now - _lastRefresh).TotalMilliseconds < _throttleMs)
return false;

Expand Down
42 changes: 36 additions & 6 deletions PolyPilot/Services/CopilotService.Bridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ private async Task InitializeRemoteAsync(ConnectionSettings settings, Cancellati
// Increment generation counter — each sub-turn gets a new generation so
// a delayed guard removal from a previous sub-turn won't kill this one.
_remoteStreamingSessions.AddOrUpdate(s, 1, (_, prev) => prev + 1);
// Clear the TurnEnd guard — a new turn is starting, so sessions_list should be
// allowed to sync IsProcessing=true again.
_recentTurnEndSessions.TryRemove(s, out _);
// Set IsProcessing on the UI thread to avoid race with TurnEnd:
// When TurnEnd and TurnStart arrive back-to-back, both InvokeOnUI callbacks
// are queued. TurnEnd fires first (sets false), then TurnStart fires (sets true).
Expand Down Expand Up @@ -245,6 +248,8 @@ private async Task InitializeRemoteAsync(ConnectionSettings settings, Cancellati
session.ProcessingStartedAt = null;
session.ToolCallCount = 0;
session.ProcessingPhase = 0;
// Guard against stale sessions_list re-setting IsProcessing=true
_recentTurnEndSessions[s] = DateTime.UtcNow;
// Mark last assistant message as complete
var lastAssistant = session.History.LastOrDefault(m => m.IsAssistant && !m.IsComplete);
if (lastAssistant != null) { lastAssistant.IsComplete = true; lastAssistant.Model = session.Model; }
Expand Down Expand Up @@ -280,7 +285,22 @@ private async Task InitializeRemoteAsync(ConnectionSettings settings, Cancellati
}
});
};
_bridgeClient.OnSessionComplete += (s, sum) => InvokeOnUI(() => OnSessionComplete?.Invoke(s, sum));
_bridgeClient.OnSessionComplete += (s, sum) => InvokeOnUI(() =>
{
// Belt-and-suspenders: also clear IsProcessing on session_complete in case
// the turn_end message was lost or arrived out of order.
var session = GetRemoteSession(s);
if (session != null && session.IsProcessing)
{
Debug($"[BRIDGE-SESSION-COMPLETE] '{session.Name}' clearing stale IsProcessing");
session.IsProcessing = false;
session.ProcessingStartedAt = null;
session.ToolCallCount = 0;
session.ProcessingPhase = 0;
_recentTurnEndSessions[s] = DateTime.UtcNow;
}
OnSessionComplete?.Invoke(s, sum);
});
_bridgeClient.OnError += (s, e) => InvokeOnUI(() =>
{
// Ignore errors for sessions already deleted locally (e.g., SDK error during dispose)
Expand Down Expand Up @@ -452,7 +472,7 @@ private void OnConnectivityChanged(object? sender, Microsoft.Maui.Networking.Con
/// <summary>
/// Sync remote session list from WsBridgeClient into our local _sessions dictionary.
/// </summary>
private void SyncRemoteSessions()
internal void SyncRemoteSessions()
{
var remoteSessions = _bridgeClient.Sessions;
var remoteActive = _bridgeClient.ActiveSessionName;
Expand Down Expand Up @@ -499,10 +519,20 @@ private void SyncRemoteSessions()
// sessions list, which may be stale by the time it arrives.
if (!_remoteStreamingSessions.ContainsKey(rs.Name))
{
state.Info.IsProcessing = rs.IsProcessing;
state.Info.ProcessingStartedAt = rs.ProcessingStartedAt;
state.Info.ToolCallCount = rs.ToolCallCount;
state.Info.ProcessingPhase = rs.ProcessingPhase;
// Don't let a stale sessions_list snapshot re-set IsProcessing=true after
// TurnEnd already cleared it. The debounced sessions_list may have been
// captured before CompleteResponse ran on the server.
bool turnEndGuardActive = rs.IsProcessing &&
_recentTurnEndSessions.TryGetValue(rs.Name, out var turnEndTime) &&
(DateTime.UtcNow - turnEndTime).TotalSeconds < 5;

if (!turnEndGuardActive)
{
state.Info.IsProcessing = rs.IsProcessing;
state.Info.ProcessingStartedAt = rs.ProcessingStartedAt;
state.Info.ToolCallCount = rs.ToolCallCount;
state.Info.ProcessingPhase = rs.ProcessingPhase;
}
state.Info.MessageCount = rs.MessageCount;
}
if (!string.IsNullOrEmpty(rs.Model))
Expand Down
10 changes: 10 additions & 0 deletions PolyPilot/Services/CopilotService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ public partial class CopilotService : IAsyncDisposable
private readonly ConcurrentDictionary<string, byte> _recentlyClosedRemoteSessions = new();
// Sessions currently receiving streaming content via bridge events — history sync skipped to avoid duplicates
private readonly ConcurrentDictionary<string, int> _remoteStreamingSessions = new();
// Sessions whose IsProcessing was recently cleared by a TurnEnd bridge event.
// Prevents SyncRemoteSessions (debounced sessions_list) from overwriting the authoritative
// TurnEnd state with a stale snapshot. Entries auto-expire after 5 seconds.
private readonly ConcurrentDictionary<string, DateTime> _recentTurnEndSessions = new();

/// <summary>
/// Drafts queued by "Continue in new session" for the Dashboard to pick up.
Expand All @@ -37,6 +41,12 @@ internal void SetRemoteStreamingGuardForTesting(string sessionName, bool active)
if (active) _remoteStreamingSessions.TryAdd(sessionName, 0);
else _remoteStreamingSessions.TryRemove(sessionName, out _);
}
/// <summary>Test-only: set or clear the TurnEnd guard that prevents stale sessions_list from re-setting IsProcessing.</summary>
internal void SetTurnEndGuardForTesting(string sessionName, bool active)
{
if (active) _recentTurnEndSessions[sessionName] = DateTime.UtcNow;
else _recentTurnEndSessions.TryRemove(sessionName, out _);
}
// Sessions for which history has already been requested — prevents duplicate request storms
private readonly ConcurrentDictionary<string, byte> _requestedHistorySessions = new();
// External session IDs currently being resumed — prevents duplicate SDK connections from rapid double-clicks
Expand Down