diff --git a/PolyPilot.Tests/BridgeDisconnectTests.cs b/PolyPilot.Tests/BridgeDisconnectTests.cs index ac04acae88..ad56ffb4df 100644 --- a/PolyPilot.Tests/BridgeDisconnectTests.cs +++ b/PolyPilot.Tests/BridgeDisconnectTests.cs @@ -265,4 +265,87 @@ public async Task ForceSync_WhenNotStreaming_AlwaysAppliesServerHistory() Assert.Equal(2, session.History.Count); Assert.Equal(2, session.MessageCount); } + + // ===== Stale IsProcessing guard tests ===== + + [Fact] + public async Task SyncRemoteSessions_DoesNotResetIsProcessing_AfterTurnEnd() + { + // Scenario: TurnEnd clears IsProcessing=false on mobile, then a stale + // sessions_list arrives (debounced on server) with IsProcessing=true. + // The TurnEnd guard should prevent re-setting to true. + var svc = CreateRemoteService(); + await AddRemoteSession(svc, "orchestrator"); + var session = svc.GetSession("orchestrator")!; + + // Server starts processing — sessions_list sets IsProcessing=true + _bridgeClient.Sessions = new() { new SessionSummary { Name = "orchestrator", IsProcessing = true } }; + svc.SyncRemoteSessions(); + Assert.True(session.IsProcessing); + + // TurnEnd arrives — clears IsProcessing and sets the guard + session.IsProcessing = false; + svc.SetTurnEndGuardForTesting("orchestrator", true); + + // Stale sessions_list snapshot arrives with IsProcessing=true + _bridgeClient.Sessions = new() { new SessionSummary { Name = "orchestrator", IsProcessing = true } }; + svc.SyncRemoteSessions(); + + // Guard should prevent overwrite — still false + Assert.False(session.IsProcessing); + } + + [Fact] + public async Task SyncRemoteSessions_AllowsIsProcessingTrue_OnInitialSync() + { + // Scenario: Fresh connection — no prior TurnEnd. sessions_list with + // IsProcessing=true should be accepted (no guard entry exists). + var svc = CreateRemoteService(); + _bridgeClient.Sessions = new() { new SessionSummary { Name = "new-session", IsProcessing = true } }; + svc.SyncRemoteSessions(); + + var session = svc.GetSession("new-session"); + Assert.NotNull(session); + Assert.True(session!.IsProcessing); + } + + [Fact] + public async Task TurnStart_ClearsGuard_AllowsSessionsListToSetProcessing() + { + // Scenario: After TurnEnd guard is set, a new TurnStart clears it. + // sessions_list should then be able to set IsProcessing=true. + var svc = CreateRemoteService(); + await AddRemoteSession(svc, "session1"); + var session = svc.GetSession("session1")!; + + // Turn completes — guard is set + session.IsProcessing = false; + svc.SetTurnEndGuardForTesting("session1", true); + + // New turn starts — clear the guard (simulating TurnStart handler) + svc.SetTurnEndGuardForTesting("session1", false); + session.IsProcessing = true; // TurnStart sets this + + // sessions_list confirms processing — guard is gone, should succeed + _bridgeClient.Sessions = new() { new SessionSummary { Name = "session1", IsProcessing = true } }; + svc.SyncRemoteSessions(); + Assert.True(session.IsProcessing); + } + + [Fact] + public async Task SyncRemoteSessions_AllowsSessionsListToClearProcessing() + { + // Scenario: Server says session is done (IsProcessing=false). + // SyncRemoteSessions should always accept false from the server. + var svc = CreateRemoteService(); + await AddRemoteSession(svc, "session1"); + var session = svc.GetSession("session1")!; + + // Session is processing + session.IsProcessing = true; + _bridgeClient.Sessions = new() { new SessionSummary { Name = "session1", IsProcessing = false } }; + svc.SyncRemoteSessions(); + + Assert.False(session.IsProcessing); + } } diff --git a/PolyPilot.Tests/RenderThrottleTests.cs b/PolyPilot.Tests/RenderThrottleTests.cs index 20b33b9297..eae7d1f4cc 100644 --- a/PolyPilot.Tests/RenderThrottleTests.cs +++ b/PolyPilot.Tests/RenderThrottleTests.cs @@ -88,6 +88,33 @@ public void CustomThrottleInterval_Respected() Assert.True(throttle.ShouldRefresh(isSessionSwitch: false, hasCompletedSessions: false)); } + [Fact] + public void Streaming_BypassesThrottle() + { + var throttle = new RenderThrottle(500); + // First call goes through normally + Assert.True(throttle.ShouldRefresh(isSessionSwitch: false, hasCompletedSessions: false)); + + // Immediately after, normal refresh is throttled + Assert.False(throttle.ShouldRefresh(isSessionSwitch: false, hasCompletedSessions: false)); + + // But streaming content always gets through + Assert.True(throttle.ShouldRefresh(isSessionSwitch: false, hasCompletedSessions: false, isStreaming: true)); + } + + [Fact] + public void Streaming_UpdatesLastRefreshTime() + { + var throttle = new RenderThrottle(500); + throttle.SetLastRefresh(DateTime.UtcNow.AddSeconds(-10)); + + // Streaming bypass updates the timestamp + Assert.True(throttle.ShouldRefresh(isSessionSwitch: false, hasCompletedSessions: false, isStreaming: true)); + + // So a normal refresh immediately after is still throttled + Assert.False(throttle.ShouldRefresh(isSessionSwitch: false, hasCompletedSessions: false)); + } + [Fact] public void CompletionRace_OnStateChangedThrottledButHandleCompleteRenders() { diff --git a/PolyPilot.Tests/TestStubs.cs b/PolyPilot.Tests/TestStubs.cs index a54dc98d30..9c4b9176ec 100644 --- a/PolyPilot.Tests/TestStubs.cs +++ b/PolyPilot.Tests/TestStubs.cs @@ -190,6 +190,12 @@ public Task CreateWorktreeAsync(string repoId, string? b public Task FetchImageAsync(string path, CancellationToken ct = default) => Task.FromResult(new FetchImageResponsePayload { Error = "Stub" }); + + // Test helpers for firing events + public void FireTurnStart(string sessionName) => OnTurnStart?.Invoke(sessionName); + public void FireTurnEnd(string sessionName) => OnTurnEnd?.Invoke(sessionName); + public void FireSessionComplete(string sessionName, string summary = "") => OnSessionComplete?.Invoke(sessionName, summary); + public void FireStateChanged() => OnStateChanged?.Invoke(); } internal class StubDemoService : IDemoService diff --git a/PolyPilot/Components/Pages/Dashboard.razor b/PolyPilot/Components/Pages/Dashboard.razor index 2e40b880b7..ef38f81c0f 100644 --- a/PolyPilot/Components/Pages/Dashboard.razor +++ b/PolyPilot/Components/Pages/Dashboard.razor @@ -1018,6 +1018,7 @@ private DateTime _lastRefresh = DateTime.MinValue; private volatile bool _renderDirty; + private volatile bool _contentDirty; // Set when content_delta arrives, cleared after render private Timer? _renderTimer; private Timer? _heartbeatTimer; private readonly ConcurrentDictionary _lastRenderedSnapshot = new(); @@ -1183,8 +1184,11 @@ var active = CopilotService.ActiveSessionName; bool sessionSwitched = active != _lastActiveSession; - // Throttle non-switch state changes to max 2/sec, but always allow completed sessions through - if (!_refreshThrottle.ShouldRefresh(sessionSwitched, completedSessions.Count > 0)) + // Throttle non-switch state changes to max 2/sec, but always allow completed sessions + // and active streaming content through + var isStreaming = _contentDirty; + if (isStreaming) _contentDirty = false; + if (!_refreshThrottle.ShouldRefresh(sessionSwitched, completedSessions.Count > 0, isStreaming)) return; sessions = CopilotService.GetAllSessions().ToList(); @@ -1294,6 +1298,7 @@ if (!streamingBySession.ContainsKey(sessionName)) streamingBySession[sessionName] = ""; streamingBySession[sessionName] += content; + _contentDirty = true; if (sessionName == expandedSession || expandedSession == null) ScheduleRender(); } diff --git a/PolyPilot/Models/RenderThrottle.cs b/PolyPilot/Models/RenderThrottle.cs index ae6f3f4f92..45977c0cab 100644 --- a/PolyPilot/Models/RenderThrottle.cs +++ b/PolyPilot/Models/RenderThrottle.cs @@ -16,10 +16,10 @@ public RenderThrottle(int throttleMs = 500) /// /// Returns true if the refresh should proceed, false if throttled. - /// A refresh always proceeds when hasCompletedSessions is true (turn just finished) - /// or when isSessionSwitch is true. + /// A refresh always proceeds when hasCompletedSessions is true (turn just finished), + /// when isSessionSwitch is true, or when isStreaming is true (content is actively arriving). /// - public bool ShouldRefresh(bool isSessionSwitch, bool hasCompletedSessions) + public bool ShouldRefresh(bool isSessionSwitch, bool hasCompletedSessions, bool isStreaming = false) { if (isSessionSwitch) return true; @@ -33,6 +33,13 @@ public bool ShouldRefresh(bool isSessionSwitch, bool hasCompletedSessions) return true; } + // Always allow during active streaming — content deltas must render promptly + if (isStreaming) + { + _lastRefresh = now; + return true; + } + if ((now - _lastRefresh).TotalMilliseconds < _throttleMs) return false; diff --git a/PolyPilot/Services/CopilotService.Bridge.cs b/PolyPilot/Services/CopilotService.Bridge.cs index 038dab7693..a3ff0fbbdc 100644 --- a/PolyPilot/Services/CopilotService.Bridge.cs +++ b/PolyPilot/Services/CopilotService.Bridge.cs @@ -217,6 +217,9 @@ private async Task InitializeRemoteAsync(ConnectionSettings settings, Cancellati // Increment generation counter — each sub-turn gets a new generation so // a delayed guard removal from a previous sub-turn won't kill this one. _remoteStreamingSessions.AddOrUpdate(s, 1, (_, prev) => prev + 1); + // Clear the TurnEnd guard — a new turn is starting, so sessions_list should be + // allowed to sync IsProcessing=true again. + _recentTurnEndSessions.TryRemove(s, out _); // Set IsProcessing on the UI thread to avoid race with TurnEnd: // When TurnEnd and TurnStart arrive back-to-back, both InvokeOnUI callbacks // are queued. TurnEnd fires first (sets false), then TurnStart fires (sets true). @@ -245,6 +248,8 @@ private async Task InitializeRemoteAsync(ConnectionSettings settings, Cancellati session.ProcessingStartedAt = null; session.ToolCallCount = 0; session.ProcessingPhase = 0; + // Guard against stale sessions_list re-setting IsProcessing=true + _recentTurnEndSessions[s] = DateTime.UtcNow; // Mark last assistant message as complete var lastAssistant = session.History.LastOrDefault(m => m.IsAssistant && !m.IsComplete); if (lastAssistant != null) { lastAssistant.IsComplete = true; lastAssistant.Model = session.Model; } @@ -280,7 +285,22 @@ private async Task InitializeRemoteAsync(ConnectionSettings settings, Cancellati } }); }; - _bridgeClient.OnSessionComplete += (s, sum) => InvokeOnUI(() => OnSessionComplete?.Invoke(s, sum)); + _bridgeClient.OnSessionComplete += (s, sum) => InvokeOnUI(() => + { + // Belt-and-suspenders: also clear IsProcessing on session_complete in case + // the turn_end message was lost or arrived out of order. + var session = GetRemoteSession(s); + if (session != null && session.IsProcessing) + { + Debug($"[BRIDGE-SESSION-COMPLETE] '{session.Name}' clearing stale IsProcessing"); + session.IsProcessing = false; + session.ProcessingStartedAt = null; + session.ToolCallCount = 0; + session.ProcessingPhase = 0; + _recentTurnEndSessions[s] = DateTime.UtcNow; + } + OnSessionComplete?.Invoke(s, sum); + }); _bridgeClient.OnError += (s, e) => InvokeOnUI(() => { // Ignore errors for sessions already deleted locally (e.g., SDK error during dispose) @@ -452,7 +472,7 @@ private void OnConnectivityChanged(object? sender, Microsoft.Maui.Networking.Con /// /// Sync remote session list from WsBridgeClient into our local _sessions dictionary. /// - private void SyncRemoteSessions() + internal void SyncRemoteSessions() { var remoteSessions = _bridgeClient.Sessions; var remoteActive = _bridgeClient.ActiveSessionName; @@ -499,10 +519,20 @@ private void SyncRemoteSessions() // sessions list, which may be stale by the time it arrives. if (!_remoteStreamingSessions.ContainsKey(rs.Name)) { - state.Info.IsProcessing = rs.IsProcessing; - state.Info.ProcessingStartedAt = rs.ProcessingStartedAt; - state.Info.ToolCallCount = rs.ToolCallCount; - state.Info.ProcessingPhase = rs.ProcessingPhase; + // Don't let a stale sessions_list snapshot re-set IsProcessing=true after + // TurnEnd already cleared it. The debounced sessions_list may have been + // captured before CompleteResponse ran on the server. + bool turnEndGuardActive = rs.IsProcessing && + _recentTurnEndSessions.TryGetValue(rs.Name, out var turnEndTime) && + (DateTime.UtcNow - turnEndTime).TotalSeconds < 5; + + if (!turnEndGuardActive) + { + state.Info.IsProcessing = rs.IsProcessing; + state.Info.ProcessingStartedAt = rs.ProcessingStartedAt; + state.Info.ToolCallCount = rs.ToolCallCount; + state.Info.ProcessingPhase = rs.ProcessingPhase; + } state.Info.MessageCount = rs.MessageCount; } if (!string.IsNullOrEmpty(rs.Model)) diff --git a/PolyPilot/Services/CopilotService.cs b/PolyPilot/Services/CopilotService.cs index 09a60e74ea..4afe68534c 100644 --- a/PolyPilot/Services/CopilotService.cs +++ b/PolyPilot/Services/CopilotService.cs @@ -18,6 +18,10 @@ public partial class CopilotService : IAsyncDisposable private readonly ConcurrentDictionary _recentlyClosedRemoteSessions = new(); // Sessions currently receiving streaming content via bridge events — history sync skipped to avoid duplicates private readonly ConcurrentDictionary _remoteStreamingSessions = new(); + // Sessions whose IsProcessing was recently cleared by a TurnEnd bridge event. + // Prevents SyncRemoteSessions (debounced sessions_list) from overwriting the authoritative + // TurnEnd state with a stale snapshot. Entries auto-expire after 5 seconds. + private readonly ConcurrentDictionary _recentTurnEndSessions = new(); /// /// Drafts queued by "Continue in new session" for the Dashboard to pick up. @@ -37,6 +41,12 @@ internal void SetRemoteStreamingGuardForTesting(string sessionName, bool active) if (active) _remoteStreamingSessions.TryAdd(sessionName, 0); else _remoteStreamingSessions.TryRemove(sessionName, out _); } + /// Test-only: set or clear the TurnEnd guard that prevents stale sessions_list from re-setting IsProcessing. + internal void SetTurnEndGuardForTesting(string sessionName, bool active) + { + if (active) _recentTurnEndSessions[sessionName] = DateTime.UtcNow; + else _recentTurnEndSessions.TryRemove(sessionName, out _); + } // Sessions for which history has already been requested — prevents duplicate request storms private readonly ConcurrentDictionary _requestedHistorySessions = new(); // External session IDs currently being resumed — prevents duplicate SDK connections from rapid double-clicks