Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions PolyPilot.Tests/MultiAgentRegressionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2933,4 +2933,122 @@ public void CreateGroupFromPresetPayload_CoversAllPresetFields()
}

#endregion

#region ReflectionState Reset After Resume (PR #590)

[Fact]
public void ReflectionState_StaleActiveState_CanBeResetAfterResume()
{
// When PolyPilot restarts mid-reflect-loop, the persisted ReflectionState
// has IsActive=true. After resume completes (MonitorAndSynthesizeAsync),
// the code must reset IsActive=false so StartGroupReflection can create
// fresh state on the next user prompt. The StartedAt guard ensures we only
// reset the stale cycle, not a fresh one created by the user.
var group = new SessionGroup
{
Id = Guid.NewGuid().ToString(),
Name = "TestOrchestrator",
IsMultiAgent = true,
OrchestratorMode = MultiAgentMode.OrchestratorReflect,
ReflectionState = ReflectionCycle.Create("Fix all bugs", maxIterations: 10)
};

// Simulate mid-loop state (iteration 3, still active)
group.ReflectionState.CurrentIteration = 3;
Assert.True(group.ReflectionState.IsActive);
Assert.Null(group.ReflectionState.CompletedAt);

// Capture StartedAt as the production code does (from PendingOrchestration)
var staleStartedAt = group.ReflectionState.StartedAt;

// Simulate what ResumeOrchestrationIfPendingAsync does after resume completes:
// detect stale IsActive=true and reset it (with StartedAt guard)
if (group.ReflectionState is { IsActive: true } rs
&& (staleStartedAt == default || rs.StartedAt <= staleStartedAt))
{
rs.IsActive = false;
rs.CompletedAt = DateTime.Now;
}

Assert.False(group.ReflectionState.IsActive);
Assert.NotNull(group.ReflectionState.CompletedAt);
// Iteration count preserved (not reset — that's RetryOrchestration's job)
Assert.Equal(3, group.ReflectionState.CurrentIteration);
}

[Fact]
public void ReflectionState_AlreadyInactive_NotModifiedOnResume()
{
// If ReflectionState is already inactive (completed normally before restart),
// the resume path should not touch it.
var group = new SessionGroup
{
Id = Guid.NewGuid().ToString(),
Name = "TestOrchestrator",
IsMultiAgent = true,
OrchestratorMode = MultiAgentMode.OrchestratorReflect,
ReflectionState = ReflectionCycle.Create("Fix all bugs", maxIterations: 5)
};

group.ReflectionState.IsActive = false;
group.ReflectionState.GoalMet = true;
var originalCompleted = DateTime.Now.AddMinutes(-10);
group.ReflectionState.CompletedAt = originalCompleted;
var staleStartedAt = group.ReflectionState.StartedAt;

// Simulate resume check — should NOT match because IsActive is false
if (group.ReflectionState is { IsActive: true } rs
&& (staleStartedAt == default || rs.StartedAt <= staleStartedAt))
{
rs.IsActive = false;
rs.CompletedAt = DateTime.Now;
}

// CompletedAt should remain the original value
Assert.Equal(originalCompleted, group.ReflectionState.CompletedAt);
Assert.True(group.ReflectionState.GoalMet);
}

[Fact]
public void ReflectionState_FreshCycleNotResetByStaleResume()
{
// Sequential TOCTOU guard: if the user sends a new prompt (creating a fresh
// ReflectionState) before the queued InvokeOnUI callback fires, the callback
// must NOT reset the fresh cycle. The StartedAt comparison prevents this.
// Uses DateTime.UtcNow to match PendingOrchestration.StartedAt (UTC clock),
// then converts to local to match ReflectionCycle.StartedAt (local clock).
var group = new SessionGroup
{
Id = Guid.NewGuid().ToString(),
Name = "TestOrchestrator",
IsMultiAgent = true,
OrchestratorMode = MultiAgentMode.OrchestratorReflect,
ReflectionState = ReflectionCycle.Create("Original goal", maxIterations: 5)
};

// Capture stale StartedAt as UTC (simulates PendingOrchestration.StartedAt)
// then normalize to local time (as the production code does)
var pendingStartedAtUtc = DateTime.UtcNow.AddMinutes(-5);
var staleStartedAt = pendingStartedAtUtc.ToLocalTime();

// Simulate user sending a new prompt → StartGroupReflection creates fresh state
group.ReflectionState = ReflectionCycle.Create("New goal", maxIterations: 3);
Assert.True(group.ReflectionState.IsActive);
Assert.True(group.ReflectionState.StartedAt > staleStartedAt);

// Simulate the queued InvokeOnUI callback firing AFTER the fresh cycle was created
if (group.ReflectionState is { IsActive: true } rs
&& (staleStartedAt == default || rs.StartedAt <= staleStartedAt))
{
rs.IsActive = false;
rs.CompletedAt = DateTime.Now;
}

// Fresh cycle should NOT have been reset
Assert.True(group.ReflectionState.IsActive);
Assert.Null(group.ReflectionState.CompletedAt);
Assert.Equal("New goal", group.ReflectionState.Goal);
}

#endregion
}
47 changes: 32 additions & 15 deletions PolyPilot/Components/Pages/Dashboard.razor
Original file line number Diff line number Diff line change
Expand Up @@ -1919,34 +1919,51 @@
// Diagnostic: log every send attempt so we can trace routing failures
CopilotService.LogDispatchRoute(sessionName, sessionMeta != null, group?.Name, group?.IsMultiAgent, group?.OrchestratorMode, orchSession, isOrchestrator);

if (isOrchestrator && (imagePaths == null || imagePaths.Count == 0))
if (isOrchestrator)
{
AutoStartReflectionIfNeeded(group!.Id, finalPrompt);
_ = CopilotService.SendToMultiAgentGroupAsync(group!.Id, finalPrompt).ContinueWith(t =>
// Always route orchestrator messages through the dispatch pipeline,
// even when images are attached. Images are stripped (the pipeline
// doesn't forward them to workers) but the prompt still gets parsed
// for @worker blocks. Without this, image-bearing messages bypass
// dispatch entirely and @worker blocks in the response go nowhere.
if (imagePaths is { Count: > 0 })
{
var imgSession = sessions.FirstOrDefault(s => s.Name == sessionName);
if (imgSession != null)
{
imgSession.History.Add(ChatMessage.SystemMessage(
"⚠️ Images dropped — orchestration dispatch does not support image attachments. Only the text prompt is dispatched to the orchestrator and workers."));
imgSession.MessageCount = imgSession.History.Count;
}
foreach (var p in imagePaths)
{
try { File.Delete(p); } catch { }
}
}
var groupId = group!.Id;
var groupName = group.Name;
AutoStartReflectionIfNeeded(groupId, finalPrompt);
_ = CopilotService.SendToMultiAgentGroupAsync(groupId, finalPrompt).ContinueWith(t =>
{
if (t.IsFaulted)
{
var msg = t.Exception?.InnerException?.Message ?? t.Exception?.Message ?? "unknown";
InvokeAsync(() =>
{
Console.WriteLine($"[DISPATCH] Error sending to multi-agent group: {msg}");
CopilotService.LogDispatchError($"[DISPATCH] Error sending to multi-agent group '{groupName}': {msg}");
var s = sessions.FirstOrDefault(s => s.Name == sessionName);
if (s != null)
{
s.History.Add(ChatMessage.ErrorMessage($"Dispatch failed: {msg}"));
s.MessageCount = s.History.Count;
}
_ = SafeRefreshAsync();
});
}
});
}
else
{
// If this is an orchestrator but images are present, warn the user
if (isOrchestrator && imagePaths is { Count: > 0 })
{
var imgSession = sessions.FirstOrDefault(s => s.Name == sessionName);
if (imgSession != null)
{
imgSession.History.Add(ChatMessage.SystemMessage(
"⚠️ Images sent directly — orchestration routing does not support images yet."));
imgSession.MessageCount = imgSession.History.Count;
}
}
_ = CopilotService.SendPromptAsync(sessionName, finalPrompt, imagePaths, agentMode: agentMode).ContinueWith(t =>
{
if (t.IsFaulted)
Expand Down
53 changes: 43 additions & 10 deletions PolyPilot/Services/CopilotService.Organization.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,16 @@ public void LogDispatchRoute(string sessionName, bool hasMeta, string? groupName
Debug($"[DISPATCH-ROUTE] session='{sessionName}' hasMeta={hasMeta} group='{groupName}' isMulti={isMulti} mode={mode} orchSession='{orchSession}' isOrch={isOrch}");
}

/// <summary>
/// Log dispatch errors to the diagnostics file (not just Console.WriteLine).
/// Called from Dashboard.razor's ContinueWith error handler so dispatch failures
/// are visible in event-diagnostics.log for post-mortem analysis.
/// </summary>
public void LogDispatchError(string message)
{
Debug(message);
}

/// <summary>
/// Returns the group ID if the given session is an orchestrator in an active multi-agent group.
/// Used by the message queue drain to route dequeued messages through the dispatch pipeline.
Expand Down Expand Up @@ -2996,6 +3006,33 @@ private static void ClearPendingOrchestration()
internal static PendingOrchestration? LoadPendingOrchestrationForTest() => LoadPendingOrchestration();
internal static void ClearPendingOrchestrationForTest() => ClearPendingOrchestration();

/// <summary>
/// Clear pending orchestration file AND reset any stale ReflectionState on the UI thread.
/// All early-exit and completion paths in the resume flow must call this instead of bare
/// ClearPendingOrchestration() to avoid leaving ReflectionState.IsActive=true persisted.
/// </summary>
private void ClearPendingOrchestrationAndResetState(PendingOrchestration pending)
{
ClearPendingOrchestration();
var pendingGroupId = pending.GroupId;
var staleStartedAt = pending.StartedAt.Kind == DateTimeKind.Utc
? pending.StartedAt.ToLocalTime()
: pending.StartedAt;
InvokeOnUI(() =>
{
var resumeGroup = Organization.Groups.FirstOrDefault(g => g.Id == pendingGroupId);
if (resumeGroup?.ReflectionState is { IsActive: true } rs
&& (staleStartedAt == default || rs.StartedAt == null || rs.StartedAt <= staleStartedAt))
{
rs.IsActive = false;
rs.CompletedAt = DateTime.Now;
Debug($"[DISPATCH] Resume cleared stale ReflectionState for group '{resumeGroup.Name}' (was iteration {rs.CurrentIteration})");
SaveOrganization();
}
OnOrchestratorPhaseChanged?.Invoke(pendingGroupId, OrchestratorPhase.Complete, null);
});
}

/// <summary>
/// After session restore, check for a pending orchestration dispatch that was interrupted
/// by an app relaunch. If found, monitor workers and auto-synthesize when all complete.
Expand All @@ -3009,15 +3046,15 @@ internal async Task ResumeOrchestrationIfPendingAsync(CancellationToken ct = def
if (group == null)
{
Debug($"[DISPATCH] Pending orchestration group '{pending.GroupId}' no longer exists — clearing");
ClearPendingOrchestration();
ClearPendingOrchestrationAndResetState(pending);
return;
}

// Verify orchestrator session exists
if (!_sessions.ContainsKey(pending.OrchestratorName))
{
Debug($"[DISPATCH] Pending orchestration orchestrator '{pending.OrchestratorName}' not found — clearing");
ClearPendingOrchestration();
ClearPendingOrchestrationAndResetState(pending);
return;
}

Expand All @@ -3041,8 +3078,7 @@ internal async Task ResumeOrchestrationIfPendingAsync(CancellationToken ct = def
Debug($"[DISPATCH] Resume orchestration failed: {ex.Message}");
AddOrchestratorSystemMessage(pending.OrchestratorName,
$"⚠️ Failed to resume orchestration: {ex.Message}");
ClearPendingOrchestration();
InvokeOnUI(() => OnOrchestratorPhaseChanged?.Invoke(pending.GroupId, OrchestratorPhase.Complete, null));
ClearPendingOrchestrationAndResetState(pending);
}
});
}
Expand Down Expand Up @@ -3076,8 +3112,7 @@ private async Task MonitorAndSynthesizeAsync(PendingOrchestration pending, Cance

if (ct.IsCancellationRequested)
{
ClearPendingOrchestration();
InvokeOnUI(() => OnOrchestratorPhaseChanged?.Invoke(pending.GroupId, OrchestratorPhase.Complete, null));
ClearPendingOrchestrationAndResetState(pending);
return;
}

Expand Down Expand Up @@ -3231,8 +3266,7 @@ private async Task MonitorAndSynthesizeAsync(PendingOrchestration pending, Cance
{
AddOrchestratorSystemMessage(pending.OrchestratorName,
"⚠️ No worker responses available after restart — orchestration aborted.");
ClearPendingOrchestration();
InvokeOnUI(() => OnOrchestratorPhaseChanged?.Invoke(pending.GroupId, OrchestratorPhase.Complete, null));
ClearPendingOrchestrationAndResetState(pending);
return;
}

Expand Down Expand Up @@ -3320,8 +3354,7 @@ private async Task MonitorAndSynthesizeAsync(PendingOrchestration pending, Cance
$"⚠️ Failed to send synthesis: {ex.Message}");
}

ClearPendingOrchestration();
InvokeOnUI(() => OnOrchestratorPhaseChanged?.Invoke(pending.GroupId, OrchestratorPhase.Complete, null));
ClearPendingOrchestrationAndResetState(pending);
}

#endregion
Expand Down