Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 59 additions & 4 deletions PolyPilot.Tests/BackgroundTasksIdleTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -228,17 +228,17 @@ public void ProactiveIdleDefer_SubagentDeferStartedAtTicks_StampedOnBackgroundTa
var source = File.ReadAllText(Path.Combine(GetRepoRoot(),
"PolyPilot", "Services", "CopilotService.Events.cs"));

var handlerStart = source.IndexOf("case SessionBackgroundTasksChangedEvent:");
var handlerStart = source.IndexOf("case SessionBackgroundTasksChangedEvent");
Assert.True(handlerStart >= 0, "SessionBackgroundTasksChangedEvent handler not found");

// Find the next case or closing brace to bound the handler
var handlerEnd = source.IndexOf("case System", handlerStart + 1);
if (handlerEnd < 0) handlerEnd = source.Length;
var handler = source.Substring(handlerStart, handlerEnd - handlerStart);

// Must stamp SubagentDeferStartedAtTicks via CompareExchange
// Must stamp SubagentDeferStartedAtTicks via RefreshDeferredBackgroundTaskTracking
Assert.Contains("RefreshDeferredBackgroundTaskTracking", handler);
Assert.Contains("SubagentDeferStartedAtTicks", handler);
Assert.Contains("CompareExchange", handler);
}

[Fact]
Expand All @@ -249,7 +249,7 @@ public void ProactiveIdleDefer_Handler_DoesNotSetIsProcessingFalse()
var source = File.ReadAllText(Path.Combine(GetRepoRoot(),
"PolyPilot", "Services", "CopilotService.Events.cs"));

var handlerStart = source.IndexOf("case SessionBackgroundTasksChangedEvent:");
var handlerStart = source.IndexOf("case SessionBackgroundTasksChangedEvent");
Assert.True(handlerStart >= 0, "SessionBackgroundTasksChangedEvent handler not found");

var handlerEnd = source.IndexOf("case System", handlerStart + 1);
Expand Down Expand Up @@ -287,6 +287,61 @@ public void ProactiveIdleDefer_CompareExchange_SetsWhenZero()
Assert.Equal(now, field);
}

[Fact]
public void SessionIdle_StalePayload_NotDeferredWhenBgTasksAlreadyConfirmedEmpty()
{
// Regression: session.idle arrives with shells=2 but backgroundTasksChanged already
// confirmed shells=0 (race — CLI snapshotted before completions landed). PolyPilot
// must NOT defer in this case.
//
// The fix uses a sentinel: DeferredBackgroundTaskFingerprint == string.Empty means
// "backgroundTasksChanged explicitly confirmed zero tasks this turn." null means
// "no backgroundTasksChanged event has fired yet" (initial/reset state). Only the
// empty-string sentinel triggers stale detection, preventing false positives when
// session.idle arrives with genuine new tasks before backgroundTasksChanged fires.
var source = File.ReadAllText(Path.Combine(GetRepoRoot(),
"PolyPilot", "Services", "CopilotService.Events.cs"));

var idleHandlerStart = source.IndexOf("case SessionIdleEvent idle:");
Assert.True(idleHandlerStart >= 0, "SessionIdleEvent handler not found");
var idleHandlerEnd = source.IndexOf("case SessionBackgroundTasks", idleHandlerStart + 1);
if (idleHandlerEnd < 0) idleHandlerEnd = source.Length;
var handler = source.Substring(idleHandlerStart, idleHandlerEnd - idleHandlerStart);

// The handler must capture state before RefreshDeferredBackgroundTaskTracking
Assert.Contains("preIdleFingerprint", handler);
Assert.Contains("preIdleTicks", handler);
// Staleness check uses string.Empty sentinel (not null) to distinguish confirmed-empty
// from never-seen — guards against false positives on first idle with genuine tasks
Assert.Contains("idlePayloadIsStale", handler);
Assert.Contains("preIdleFingerprint == string.Empty", handler);
Assert.Contains("preIdleTicks == 0", handler);
Assert.Contains("tracking.Snapshot.HasAny", handler);
// hasActiveTasks must be guarded by !idlePayloadIsStale
Assert.Contains("!idlePayloadIsStale", handler);
}

[Fact]
public void RefreshDeferredBackgroundTaskTracking_SetsEmptyStringSentinel_WhenTasksConfirmedGone()
{
// When backgroundTasksChanged fires with no tasks, RefreshDeferredBackgroundTaskTracking
// must set DeferredBackgroundTaskFingerprint = string.Empty (not null). This sentinel
// is what distinguishes "confirmed empty" from "never seen" (null).
var source = File.ReadAllText(Path.Combine(GetRepoRoot(),
"PolyPilot", "Services", "CopilotService.Events.cs"));

var methodStart = source.IndexOf("private static (BackgroundTaskSnapshot Snapshot, long FirstSeenTicks) RefreshDeferredBackgroundTaskTracking(");
Assert.True(methodStart >= 0, "RefreshDeferredBackgroundTaskTracking not found");
var methodEnd = source.IndexOf("\n private ", methodStart + 1);
if (methodEnd < 0) methodEnd = source.Length;
var method = source.Substring(methodStart, methodEnd - methodStart);

// Must set string.Empty sentinel (not null) when clearing after confirmed-empty event
Assert.Contains("string.Empty", method);
// Must NOT set null when clearing in this path (null is reserved for initial/reset state)
Assert.DoesNotContain("DeferredBackgroundTaskFingerprint = null", method);
}

private static string GetRepoRoot()
{
var dir = AppContext.BaseDirectory;
Expand Down
28 changes: 28 additions & 0 deletions PolyPilot.Tests/MultiAgentRegressionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1581,6 +1581,34 @@ public void MonitorAndSynthesize_ShouldRedispatchUnstartedWorkers()
Assert.Contains("ExecuteWorkerAsync", block);
}

[Fact]
public void MonitorAndSynthesize_ReflectResume_WaitsForOrchestratorResponse()
{
// When a reflect-mode group resumes after restart and the orchestrator responds with
// new @worker blocks (wants to keep iterating), MonitorAndSynthesizeAsync must wait for
// the response and dispatch those workers — not fire-and-forget the synthesis.
//
// Observed failure: orchestrator emitted @worker:Copilot Cli-worker-1 after resume
// synthesis, but SendPromptAsync (fire-and-forget) meant the response was never
// processed and the worker was never dispatched. Loop stalled indefinitely.
var source = File.ReadAllText(Path.Combine(GetRepoRoot(), "PolyPilot", "Services", "CopilotService.Organization.cs"));

var startIdx = source.IndexOf("private async Task MonitorAndSynthesizeAsync");
Assert.True(startIdx >= 0, "MonitorAndSynthesizeAsync method not found in source");
var block = source.Substring(startIdx, Math.Min(source.Length - startIdx, 14000));

// For reflect groups: must use SendPromptAndWaitAsync (not just SendPromptAsync)
// to get the orchestrator's response and detect new @worker assignments.
Assert.Contains("pending.IsReflect", block);
Assert.Contains("SendPromptAndWaitAsync", block);
// Must parse the orchestrator's response for @worker assignments
Assert.Contains("ParseTaskAssignments", block);
// Must dispatch workers if assignments found
Assert.Contains("resumeAssignments.Count > 0", block);
// Must apply the same collection timeout as the normal reflect loop
Assert.Contains("OrchestratorCollectionTimeout", block);
}

[Fact]
public async Task RetryOrchestration_MissingGroup_DoesNothing()
{
Expand Down
35 changes: 33 additions & 2 deletions PolyPilot/Services/CopilotService.Events.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,12 @@ private static (BackgroundTaskSnapshot Snapshot, long FirstSeenTicks) RefreshDef

if (!snapshot.HasAny)
{
state.DeferredBackgroundTaskFingerprint = null;
// Use string.Empty (not null) to distinguish "confirmed empty by backgroundTasksChanged"
// from "never seen a backgroundTasksChanged event" (null = initial/reset state).
// This sentinel lets the stale-idle detection in SessionIdleEvent tell apart:
// - null → no backgroundTasksChanged has fired this turn → idle payload may be genuine
// - "" → backgroundTasksChanged confirmed zero tasks → idle payload is stale
state.DeferredBackgroundTaskFingerprint = string.Empty;
Interlocked.Exchange(ref state.DeferredBackgroundTasksFirstSeenAtTicks, 0L);
Interlocked.Exchange(ref state.SubagentDeferStartedAtTicks, 0L);
return (snapshot, 0L);
Expand Down Expand Up @@ -851,9 +856,35 @@ void Invoke(Action action)
// KEY FIX: age background tasks by stable fingerprint (agent/shell IDs), not just
// by "current turn." Without this, the same orphaned shell IDs get their timer
// reset on every new prompt and sessions like PROMPT can appear busy forever.

// Capture PolyPilot's own background-task state BEFORE the idle payload can
// overwrite it. If backgroundTasksChanged already confirmed shells=0
// (fingerprint="" sentinel, ticks=0) but the session.idle payload still reports
// shells>0, the payload is stale: shell completions arrived and were processed
// before this idle event, but the CLI snapshotted its state slightly earlier
// (race). PolyPilot's own tracking is the ground truth — don't defer.
//
// Sentinel values for DeferredBackgroundTaskFingerprint:
// null → no backgroundTasksChanged has fired this turn (initial/reset state)
// string.Empty → backgroundTasksChanged explicitly confirmed zero tasks
// non-empty → backgroundTasksChanged reported active tasks with this fingerprint
//
// Only treat the idle as stale when fingerprint == "" (confirmed empty), NOT when
// fingerprint == null (never seen). The null case means tasks may genuinely be
// starting up and backgroundTasksChanged simply hasn't fired yet.
var preIdleFingerprint = state.DeferredBackgroundTaskFingerprint;
var preIdleTicks = Interlocked.Read(ref state.DeferredBackgroundTasksFirstSeenAtTicks);

var tracking = RefreshDeferredBackgroundTaskTracking(state, idle.Data?.BackgroundTasks);
var deferTicks = tracking.FirstSeenTicks;
var hasActiveTasks = HasActiveBackgroundTasks(idle, deferTicks);

bool idlePayloadIsStale = preIdleFingerprint == string.Empty && preIdleTicks == 0 && tracking.Snapshot.HasAny;
if (idlePayloadIsStale)
Debug($"[IDLE-DIAG-STALE] '{sessionName}' session.idle backgroundTasks " +
$"({tracking.Snapshot.AgentCount} agents, {tracking.Snapshot.ShellCount} shells) are stale — " +
$"backgroundTasksChanged already confirmed empty, completing normally");

var hasActiveTasks = !idlePayloadIsStale && HasActiveBackgroundTasks(idle, deferTicks);

// Log zombie expiry here where Debug() is available (HasActiveBackgroundTasks is static)
var zombieAgentCount = tracking.Snapshot.AgentCount;
Expand Down
65 changes: 63 additions & 2 deletions PolyPilot/Services/CopilotService.Organization.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3230,8 +3230,69 @@ private async Task MonitorAndSynthesizeAsync(PendingOrchestration pending, Cance
// response still streaming when workers complete and we try to send synthesis).
await WaitForSessionIdleAsync(pending.OrchestratorName, ct);

await SendPromptAsync(pending.OrchestratorName, synthesisPrompt, cancellationToken: ct, originalPrompt: pending.OriginalPrompt);
Debug($"[DISPATCH] Resume synthesis sent to '{pending.OrchestratorName}'");
if (pending.IsReflect)
{
// For reflect-mode groups the orchestrator's response to the synthesis may contain
// new @worker blocks (it wants to keep iterating rather than emit
// [[GROUP_REFLECT_COMPLETE]]). If we just fire-and-forget with SendPromptAsync the
// @worker dispatch is silently dropped and the loop stalls indefinitely.
// Wait for the response and, if new assignments are found, execute one more
// collect+synthesize round so the loop can complete or continue naturally.
var availableWorkers = GetMultiAgentGroupMembers(pending.GroupId)
.Where(m => m != pending.OrchestratorName).ToList();
var orchestratorResponse = await SendPromptAndWaitAsync(
pending.OrchestratorName, synthesisPrompt, ct, originalPrompt: pending.OriginalPrompt);
Debug($"[DISPATCH] Resume reflect: orchestrator response received from '{pending.OrchestratorName}'");

var resumeAssignments = ParseTaskAssignments(orchestratorResponse, availableWorkers);
if (resumeAssignments.Count > 0 && !ct.IsCancellationRequested)
{
Debug($"[DISPATCH] Resume reflect: orchestrator dispatched {resumeAssignments.Count} worker(s) — executing continuation");
AddOrchestratorSystemMessage(pending.OrchestratorName,
$"🔄 Resume: executing {resumeAssignments.Count} worker(s) dispatched by orchestrator...");
InvokeOnUI(() => OnOrchestratorPhaseChanged?.Invoke(pending.GroupId, OrchestratorPhase.Dispatching, "Resume iteration"));

var workerTasks = resumeAssignments
.Select(a => ExecuteWorkerAsync(a.WorkerName, a.Task, pending.OriginalPrompt, ct))
.ToList();

var allDone = Task.WhenAll(workerTasks);
var timeoutTask = Task.Delay(OrchestratorCollectionTimeout, CancellationToken.None);
if (await Task.WhenAny(allDone, timeoutTask) != allDone)
{
Debug($"[DISPATCH] Resume reflect: collection timeout — force-completing stuck workers");
foreach (var a in resumeAssignments)
{
if (_sessions.TryGetValue(a.WorkerName, out var ws))
{
if (ws.Info.IsProcessing)
await ForceCompleteProcessingAsync(a.WorkerName, ws, $"resume reflect collection timeout");
else
ws.ResponseCompletion?.TrySetResult("(worker timed out)");
}
}
}

var resumeResults = new List<WorkerResult>();
for (var i = 0; i < workerTasks.Count; i++)
{
var workerName = i < resumeAssignments.Count ? resumeAssignments[i].WorkerName : "unknown";
try { resumeResults.Add(await workerTasks[i]); }
catch (Exception ex) { resumeResults.Add(new WorkerResult(workerName, null, false, $"Error: {ex.Message}", TimeSpan.Zero)); }
}

InvokeOnUI(() => OnOrchestratorPhaseChanged?.Invoke(pending.GroupId, OrchestratorPhase.Synthesizing, "Resume final"));
var finalSynthesis = BuildSynthesisPrompt(pending.OriginalPrompt, resumeResults);
await WaitForSessionIdleAsync(pending.OrchestratorName, ct);
await SendPromptAsync(pending.OrchestratorName, finalSynthesis, cancellationToken: ct, originalPrompt: pending.OriginalPrompt);
Debug($"[DISPATCH] Resume reflect: final synthesis sent to '{pending.OrchestratorName}'");
}
}
else
{
await SendPromptAsync(pending.OrchestratorName, synthesisPrompt, cancellationToken: ct, originalPrompt: pending.OriginalPrompt);
Debug($"[DISPATCH] Resume synthesis sent to '{pending.OrchestratorName}'");
}
}
catch (Exception ex)
{
Expand Down