diff --git a/PolyPilot.Tests/StateChangeCoalescerTests.cs b/PolyPilot.Tests/StateChangeCoalescerTests.cs new file mode 100644 index 0000000000..92b9fe53ca --- /dev/null +++ b/PolyPilot.Tests/StateChangeCoalescerTests.cs @@ -0,0 +1,94 @@ +using Microsoft.Extensions.DependencyInjection; +using PolyPilot.Services; + +namespace PolyPilot.Tests; + +/// +/// Tests for NotifyStateChangedCoalesced — verifies that rapid-fire calls +/// coalesce into fewer OnStateChanged invocations. +/// +public class StateChangeCoalescerTests +{ + private readonly StubChatDatabase _chatDb = new(); + private readonly StubServerManager _serverManager = new(); + private readonly StubWsBridgeClient _bridgeClient = new(); + private readonly StubDemoService _demoService = new(); + private readonly RepoManager _repoManager = new(); + private readonly IServiceProvider _serviceProvider; + + public StateChangeCoalescerTests() + { + var services = new ServiceCollection(); + _serviceProvider = services.BuildServiceProvider(); + } + + private CopilotService CreateService() => + new CopilotService(_chatDb, _serverManager, _bridgeClient, _repoManager, _serviceProvider, _demoService); + + [Fact] + public async Task RapidCalls_CoalesceIntoSingleNotification() + { + var svc = CreateService(); + int fireCount = 0; + svc.OnStateChanged += () => Interlocked.Increment(ref fireCount); + + // Fire 20 rapid coalesced notifications + for (int i = 0; i < 20; i++) + svc.NotifyStateChangedCoalesced(); + + // Wait for the coalesce timer to fire (150ms + margin) + await Task.Delay(300); + + // Should have coalesced into 1 notification (not 20) + Assert.InRange(fireCount, 1, 3); + } + + [Fact] + public async Task SingleCall_FiresExactlyOnce() + { + var svc = CreateService(); + int fireCount = 0; + svc.OnStateChanged += () => Interlocked.Increment(ref fireCount); + + svc.NotifyStateChangedCoalesced(); + await Task.Delay(300); + + Assert.Equal(1, fireCount); + } + + [Fact] + public async Task SeparateBursts_FireSeparately() + { + var svc = CreateService(); + int fireCount = 0; + svc.OnStateChanged += () => Interlocked.Increment(ref fireCount); + + // First burst + for (int i = 0; i < 10; i++) + svc.NotifyStateChangedCoalesced(); + await Task.Delay(300); + + // Second burst after timer has fired + for (int i = 0; i < 10; i++) + svc.NotifyStateChangedCoalesced(); + await Task.Delay(300); + + // Each burst should produce ~1 notification + Assert.InRange(fireCount, 2, 4); + } + + [Fact] + public void ImmediateNotify_StillWorks() + { + var svc = CreateService(); + int fireCount = 0; + svc.OnStateChanged += () => Interlocked.Increment(ref fireCount); + + // Direct OnStateChanged (not coalesced) should fire immediately + svc.NotifyStateChanged(); + Assert.Equal(1, fireCount); + + svc.NotifyStateChanged(); + Assert.Equal(2, fireCount); + } +} diff --git a/PolyPilot/Services/CopilotService.Bridge.cs b/PolyPilot/Services/CopilotService.Bridge.cs index 82efcd69b2..de12c6b573 100644 --- a/PolyPilot/Services/CopilotService.Bridge.cs +++ b/PolyPilot/Services/CopilotService.Bridge.cs @@ -49,7 +49,7 @@ private async Task InitializeRemoteAsync(ConnectionSettings settings, Cancellati _bridgeClient.OnStateChanged += () => { SyncRemoteSessions(); - InvokeOnUI(() => OnStateChanged?.Invoke()); + NotifyStateChangedCoalesced(); }; _bridgeClient.OnReposListReceived += payload => { diff --git a/PolyPilot/Services/CopilotService.Events.cs b/PolyPilot/Services/CopilotService.Events.cs index a995a9a03c..44274e4cf7 100644 --- a/PolyPilot/Services/CopilotService.Events.cs +++ b/PolyPilot/Services/CopilotService.Events.cs @@ -308,7 +308,7 @@ void Invoke(Action action) if (state.Info.ProcessingPhase < 3) { state.Info.ProcessingPhase = 3; // Working - Invoke(() => OnStateChanged?.Invoke()); + NotifyStateChangedCoalesced(); } var startToolName = toolStart.Data.ToolName ?? "unknown"; var startCallId = toolStart.Data.ToolCallId ?? ""; @@ -419,7 +419,7 @@ void Invoke(Action action) Invoke(() => { if (isPermissionDenial) - OnStateChanged?.Invoke(); + NotifyStateChangedCoalesced(); OnToolCompleted?.Invoke(sessionName, completeCallId, resultStr, !hasError); OnActivity?.Invoke(sessionName, hasError ? "❌ Tool failed" : "✅ Tool completed"); }); @@ -449,7 +449,7 @@ void Invoke(Action action) { OnTurnStart?.Invoke(sessionName); OnActivity?.Invoke(sessionName, "🤔 Thinking..."); - if (phaseAdvancedToThinking) OnStateChanged?.Invoke(); + if (phaseAdvancedToThinking) NotifyStateChangedCoalesced(); }); break; diff --git a/PolyPilot/Services/CopilotService.cs b/PolyPilot/Services/CopilotService.cs index b8e637a995..6b1c4d75ee 100644 --- a/PolyPilot/Services/CopilotService.cs +++ b/PolyPilot/Services/CopilotService.cs @@ -202,6 +202,7 @@ internal CopilotService(IChatDatabase chatDb, IServerManager serverManager, IWsB _serviceProvider = serviceProvider; _demoService = demoService; _codespaceService = codespaceService ?? new CodespaceService(); + _stateChangedCoalesceTimer = new Timer(FireCoalescedStateChanged, null, Timeout.Infinite, Timeout.Infinite); try { _usageStats = serviceProvider?.GetService(typeof(UsageStatsService)) as UsageStatsService; } catch { } } @@ -328,6 +329,37 @@ internal int RemoveGroupsWhere(Predicate match) public event Action? OnStateChanged; public void NotifyStateChanged() => OnStateChanged?.Invoke(); + + /// + /// Coalesced state change notification. Batches rapid-fire events (tool starts, + /// phase changes, turn starts) into a single OnStateChanged callback within the + /// coalesce window. Use this for high-frequency, non-critical state updates. + /// Critical events (completion, errors, session switches) should still call + /// OnStateChanged?.Invoke() directly for immediate UI response. + /// + private Timer? _stateChangedCoalesceTimer; + private const int StateChangedCoalesceMs = 150; + private int _stateChangedPending; // 0 = idle, 1 = pending + + internal void NotifyStateChangedCoalesced() + { + // Mark as pending — if already pending, the timer will fire and pick it up + if (Interlocked.CompareExchange(ref _stateChangedPending, 1, 0) == 0) + { + try + { + _stateChangedCoalesceTimer?.Change(StateChangedCoalesceMs, Timeout.Infinite); + } + catch (ObjectDisposedException) { } + } + } + + private void FireCoalescedStateChanged(object? _) + { + Interlocked.Exchange(ref _stateChangedPending, 0); + InvokeOnUI(() => OnStateChanged?.Invoke()); + } + public event Action? OnContentReceived; // sessionName, content public event Action? OnError; // sessionName, error public event Action? OnSessionComplete; // sessionName, summary @@ -3371,7 +3403,10 @@ public async ValueTask DisposeAsync() _saveUiStateDebounce?.Dispose(); _saveUiStateDebounce = null; FlushUiState(); - + + _stateChangedCoalesceTimer?.Dispose(); + _stateChangedCoalesceTimer = null; + foreach (var state in _sessions.Values) { CancelProcessingWatchdog(state); diff --git a/PolyPilot/Services/DevTunnelService.cs b/PolyPilot/Services/DevTunnelService.cs index 2a703c93f9..4e0f6bc5e9 100644 --- a/PolyPilot/Services/DevTunnelService.cs +++ b/PolyPilot/Services/DevTunnelService.cs @@ -279,8 +279,9 @@ private async Task TryHostTunnelAsync(ConnectionSettings settings) if (_hostProcess != null && !_hostProcess.HasExited) { try { _hostProcess.Kill(entireProcessTree: true); } catch { } - _hostProcess = null; } + _hostProcess?.Dispose(); + _hostProcess = null; var hostArgs = _tunnelId != null ? $"host {_tunnelId}" @@ -458,6 +459,7 @@ public void Stop() _hostProcess.Kill(entireProcessTree: true); Console.WriteLine("[DevTunnel] Host process killed"); } + _hostProcess?.Dispose(); } catch (Exception ex) { diff --git a/PolyPilot/Services/ServerManager.cs b/PolyPilot/Services/ServerManager.cs index 20711b0e93..2e4db8f5d3 100644 --- a/PolyPilot/Services/ServerManager.cs +++ b/PolyPilot/Services/ServerManager.cs @@ -95,15 +95,13 @@ public async Task StartServerAsync(int port = 4321) SavePidFile(process.Id, port); Console.WriteLine($"[ServerManager] Started copilot server PID {process.Id} on port {port}"); - // Detach stdout/stderr readers so they don't hold the process - _ = Task.Run(async () => - { - try { while (await process.StandardOutput.ReadLineAsync() != null) { } } catch { } - }); - _ = Task.Run(async () => - { - try { while (await process.StandardError.ReadLineAsync() != null) { } } catch { } - }); + // Drain stdout/stderr in parallel; dispose process handle when both streams close. + // The server process itself keeps running — we only release the OS handle. + // Must be parallel: sequential draining deadlocks if stderr fills its pipe buffer + // while stdout drain blocks waiting for the process to exit. + var t1 = Task.Run(async () => { try { while (await process.StandardOutput.ReadLineAsync() != null) { } } catch { } }); + var t2 = Task.Run(async () => { try { while (await process.StandardError.ReadLineAsync() != null) { } } catch { } }); + _ = Task.WhenAll(t1, t2).ContinueWith(_ => process.Dispose()); // Wait for server to become available for (int i = 0; i < 15; i++) @@ -140,6 +138,7 @@ public void StopServer() { var process = Process.GetProcessById(pid.Value); process.Kill(); + process.Dispose(); Console.WriteLine($"[ServerManager] Killed server PID {pid}"); } catch (Exception ex)