Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions PolyPilot.Tests/StateChangeCoalescerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
using Microsoft.Extensions.DependencyInjection;
using PolyPilot.Services;

namespace PolyPilot.Tests;

/// <summary>
/// Tests for NotifyStateChangedCoalesced — verifies that rapid-fire calls
/// coalesce into fewer OnStateChanged invocations.
/// </summary>
public class StateChangeCoalescerTests
{
private readonly StubChatDatabase _chatDb = new();
private readonly StubServerManager _serverManager = new();
private readonly StubWsBridgeClient _bridgeClient = new();
private readonly StubDemoService _demoService = new();
private readonly RepoManager _repoManager = new();
private readonly IServiceProvider _serviceProvider;

public StateChangeCoalescerTests()
{
var services = new ServiceCollection();
_serviceProvider = services.BuildServiceProvider();
}

private CopilotService CreateService() =>
new CopilotService(_chatDb, _serverManager, _bridgeClient, _repoManager, _serviceProvider, _demoService);

[Fact]
public async Task RapidCalls_CoalesceIntoSingleNotification()
{
var svc = CreateService();
int fireCount = 0;
svc.OnStateChanged += () => Interlocked.Increment(ref fireCount);

// Fire 20 rapid coalesced notifications
for (int i = 0; i < 20; i++)
svc.NotifyStateChangedCoalesced();

// Wait for the coalesce timer to fire (150ms + margin)
await Task.Delay(300);

// Should have coalesced into 1 notification (not 20)
Assert.InRange(fireCount, 1, 3);
}

[Fact]
public async Task SingleCall_FiresExactlyOnce()
{
var svc = CreateService();
int fireCount = 0;
svc.OnStateChanged += () => Interlocked.Increment(ref fireCount);

svc.NotifyStateChangedCoalesced();
await Task.Delay(300);

Assert.Equal(1, fireCount);
}

[Fact]
public async Task SeparateBursts_FireSeparately()
{
var svc = CreateService();
int fireCount = 0;
svc.OnStateChanged += () => Interlocked.Increment(ref fireCount);

// First burst
for (int i = 0; i < 10; i++)
svc.NotifyStateChangedCoalesced();
await Task.Delay(300);

// Second burst after timer has fired
for (int i = 0; i < 10; i++)
svc.NotifyStateChangedCoalesced();
await Task.Delay(300);

// Each burst should produce ~1 notification
Assert.InRange(fireCount, 2, 4);
}

[Fact]
public void ImmediateNotify_StillWorks()
{
var svc = CreateService();
int fireCount = 0;
svc.OnStateChanged += () => Interlocked.Increment(ref fireCount);

// Direct OnStateChanged (not coalesced) should fire immediately
svc.NotifyStateChanged();
Assert.Equal(1, fireCount);

svc.NotifyStateChanged();
Assert.Equal(2, fireCount);
}
}
2 changes: 1 addition & 1 deletion PolyPilot/Services/CopilotService.Bridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private async Task InitializeRemoteAsync(ConnectionSettings settings, Cancellati
_bridgeClient.OnStateChanged += () =>
{
SyncRemoteSessions();
InvokeOnUI(() => OnStateChanged?.Invoke());
NotifyStateChangedCoalesced();
};
_bridgeClient.OnReposListReceived += payload =>
{
Expand Down
6 changes: 3 additions & 3 deletions PolyPilot/Services/CopilotService.Events.cs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ void Invoke(Action action)
if (state.Info.ProcessingPhase < 3)
{
state.Info.ProcessingPhase = 3; // Working
Invoke(() => OnStateChanged?.Invoke());
NotifyStateChangedCoalesced();
}
var startToolName = toolStart.Data.ToolName ?? "unknown";
var startCallId = toolStart.Data.ToolCallId ?? "";
Expand Down Expand Up @@ -419,7 +419,7 @@ void Invoke(Action action)
Invoke(() =>
{
if (isPermissionDenial)
OnStateChanged?.Invoke();
NotifyStateChangedCoalesced();
OnToolCompleted?.Invoke(sessionName, completeCallId, resultStr, !hasError);
OnActivity?.Invoke(sessionName, hasError ? "❌ Tool failed" : "✅ Tool completed");
});
Expand Down Expand Up @@ -449,7 +449,7 @@ void Invoke(Action action)
{
OnTurnStart?.Invoke(sessionName);
OnActivity?.Invoke(sessionName, "🤔 Thinking...");
if (phaseAdvancedToThinking) OnStateChanged?.Invoke();
if (phaseAdvancedToThinking) NotifyStateChangedCoalesced();
});
break;

Expand Down
37 changes: 36 additions & 1 deletion PolyPilot/Services/CopilotService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ internal CopilotService(IChatDatabase chatDb, IServerManager serverManager, IWsB
_serviceProvider = serviceProvider;
_demoService = demoService;
_codespaceService = codespaceService ?? new CodespaceService();
_stateChangedCoalesceTimer = new Timer(FireCoalescedStateChanged, null, Timeout.Infinite, Timeout.Infinite);
try { _usageStats = serviceProvider?.GetService(typeof(UsageStatsService)) as UsageStatsService; } catch { }
}

Expand Down Expand Up @@ -328,6 +329,37 @@ internal int RemoveGroupsWhere(Predicate<SessionGroup> match)

public event Action? OnStateChanged;
public void NotifyStateChanged() => OnStateChanged?.Invoke();

/// <summary>
/// Coalesced state change notification. Batches rapid-fire events (tool starts,
/// phase changes, turn starts) into a single OnStateChanged callback within the
/// coalesce window. Use this for high-frequency, non-critical state updates.
/// Critical events (completion, errors, session switches) should still call
/// OnStateChanged?.Invoke() directly for immediate UI response.
/// </summary>
private Timer? _stateChangedCoalesceTimer;
private const int StateChangedCoalesceMs = 150;
private int _stateChangedPending; // 0 = idle, 1 = pending

internal void NotifyStateChangedCoalesced()
{
// Mark as pending — if already pending, the timer will fire and pick it up
if (Interlocked.CompareExchange(ref _stateChangedPending, 1, 0) == 0)
{
try
{
_stateChangedCoalesceTimer?.Change(StateChangedCoalesceMs, Timeout.Infinite);
}
catch (ObjectDisposedException) { }
}
}

private void FireCoalescedStateChanged(object? _)
{
Interlocked.Exchange(ref _stateChangedPending, 0);
InvokeOnUI(() => OnStateChanged?.Invoke());
}

public event Action<string, string>? OnContentReceived; // sessionName, content
public event Action<string, string>? OnError; // sessionName, error
public event Action<string, string>? OnSessionComplete; // sessionName, summary
Expand Down Expand Up @@ -3371,7 +3403,10 @@ public async ValueTask DisposeAsync()
_saveUiStateDebounce?.Dispose();
_saveUiStateDebounce = null;
FlushUiState();


_stateChangedCoalesceTimer?.Dispose();
_stateChangedCoalesceTimer = null;

foreach (var state in _sessions.Values)
{
CancelProcessingWatchdog(state);
Expand Down
4 changes: 3 additions & 1 deletion PolyPilot/Services/DevTunnelService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,9 @@ private async Task<bool> TryHostTunnelAsync(ConnectionSettings settings)
if (_hostProcess != null && !_hostProcess.HasExited)
{
try { _hostProcess.Kill(entireProcessTree: true); } catch { }
_hostProcess = null;
}
_hostProcess?.Dispose();
_hostProcess = null;

var hostArgs = _tunnelId != null
? $"host {_tunnelId}"
Expand Down Expand Up @@ -458,6 +459,7 @@ public void Stop()
_hostProcess.Kill(entireProcessTree: true);
Console.WriteLine("[DevTunnel] Host process killed");
}
_hostProcess?.Dispose();
}
catch (Exception ex)
{
Expand Down
17 changes: 8 additions & 9 deletions PolyPilot/Services/ServerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,13 @@ public async Task<bool> StartServerAsync(int port = 4321)
SavePidFile(process.Id, port);
Console.WriteLine($"[ServerManager] Started copilot server PID {process.Id} on port {port}");

// Detach stdout/stderr readers so they don't hold the process
_ = Task.Run(async () =>
{
try { while (await process.StandardOutput.ReadLineAsync() != null) { } } catch { }
});
_ = Task.Run(async () =>
{
try { while (await process.StandardError.ReadLineAsync() != null) { } } catch { }
});
// Drain stdout/stderr in parallel; dispose process handle when both streams close.
// The server process itself keeps running — we only release the OS handle.
// Must be parallel: sequential draining deadlocks if stderr fills its pipe buffer
// while stdout drain blocks waiting for the process to exit.
var t1 = Task.Run(async () => { try { while (await process.StandardOutput.ReadLineAsync() != null) { } } catch { } });
var t2 = Task.Run(async () => { try { while (await process.StandardError.ReadLineAsync() != null) { } } catch { } });
_ = Task.WhenAll(t1, t2).ContinueWith(_ => process.Dispose());

// Wait for server to become available
for (int i = 0; i < 15; i++)
Expand Down Expand Up @@ -140,6 +138,7 @@ public void StopServer()
{
var process = Process.GetProcessById(pid.Value);
process.Kill();
process.Dispose();
Console.WriteLine($"[ServerManager] Killed server PID {pid}");
}
catch (Exception ex)
Expand Down