From 6bc2bae9ae77cc3af363c99aaa0c701a40c87cc6 Mon Sep 17 00:00:00 2001 From: Shane Date: Tue, 10 Mar 2026 09:13:18 -0500 Subject: [PATCH 1/6] fix: fall back to fresh session on corrupt/locked session during restore When RestorePreviousSessionsAsync encounters a 'session file corrupted' error (e.g., events.jsonl locked by another copilot process), fall back to CreateSessionAsync instead of silently dropping the session. Updated error message to explain CLI lock cause. Cherry-picked from: de5f0ae, 5a21b76 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- PolyPilot/Components/Layout/SessionSidebar.razor | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/PolyPilot/Components/Layout/SessionSidebar.razor b/PolyPilot/Components/Layout/SessionSidebar.razor index 2c61075abb..bc7325c626 100644 --- a/PolyPilot/Components/Layout/SessionSidebar.razor +++ b/PolyPilot/Components/Layout/SessionSidebar.razor @@ -1620,7 +1620,7 @@ else // Show an actionable error instead and let the user decide whether to delete. if (IsCorruptSessionError(ex.Message)) { - resumeError = "Session data appears corrupted. You can delete it manually from ~/.copilot/session-state if needed."; + resumeError = "Session is locked — likely in use by a Copilot CLI terminal session. Close the CLI session first, or delete the session data from ~/.copilot/session-state if it's stale."; } else { From 0d0e4732c82d46407fabce1515da70db639ca581 Mon Sep 17 00:00:00 2001 From: Shane Date: Tue, 10 Mar 2026 09:15:54 -0500 Subject: [PATCH 2/6] fix: reliability improvements (cherry-picked + ported) Cherry-picked from feature/intercept-task-tool-229: - Corrupt/locked session restore fallback (de5f0ae, 5a21b76) - Volatile.Read cleanup for ActiveToolCallCount (45b34b3a) Ported reliability fixes: - _clientReconnectLock: SemaphoreSlim thundering-herd fix for concurrent workers hitting IsConnectionError simultaneously - Watchdog tier split: active-tool=600s, used-tools-idle=180s, default=120s (cuts zombie detection from 10min to 3min) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- PolyPilot/Services/CopilotService.Events.cs | 16 ++-- .../Services/CopilotService.Persistence.cs | 6 +- PolyPilot/Services/CopilotService.cs | 89 ++++++++++++------- 3 files changed, 72 insertions(+), 39 deletions(-) diff --git a/PolyPilot/Services/CopilotService.Events.cs b/PolyPilot/Services/CopilotService.Events.cs index a995a9a03c..ec292752b5 100644 --- a/PolyPilot/Services/CopilotService.Events.cs +++ b/PolyPilot/Services/CopilotService.Events.cs @@ -478,7 +478,7 @@ void Invoke(Action action) await Task.Delay(TurnEndIdleFallbackMs, fallbackToken); if (fallbackToken.IsCancellationRequested) return; // Guard: if tools are still active, a TurnStart is coming — skip. - if (Interlocked.CompareExchange(ref state.ActiveToolCallCount, 0, 0) > 0) + if (Volatile.Read(ref state.ActiveToolCallCount) > 0) { Debug($"[IDLE-FALLBACK] '{sessionName}' skipped — tools still active"); return; @@ -495,7 +495,7 @@ void Invoke(Action action) // Re-check: if a new tool started or TurnStart fired and cancelled // this token, we would have exited above. If still here, no new // activity arrived → SessionIdleEvent was lost → complete. - if (Interlocked.CompareExchange(ref state.ActiveToolCallCount, 0, 0) > 0) + if (Volatile.Read(ref state.ActiveToolCallCount) > 0) { Debug($"[IDLE-FALLBACK] '{sessionName}' skipped after extended wait — tools still active"); return; @@ -1387,6 +1387,9 @@ private void HandleReflectionAdvanceResult(SessionState state, string response, /// If no SDK events arrive for this many seconds while a tool is actively executing, the session is considered stuck. /// This is much longer because legitimate tool executions (e.g., running UI tests, long builds) can take many minutes. internal const int WatchdogToolExecutionTimeoutSeconds = 600; + // Sessions that USED tools but have none actively running — the model may be + // thinking between tool rounds, but 600s is too long for a likely-dead session. + internal const int WatchdogUsedToolsIdleTimeoutSeconds = 180; /// If a resumed session receives zero SDK events for this many seconds, it was likely already /// finished when the app restarted. Short enough that users don't have to click Stop, long enough /// for the SDK to start streaming if the turn is genuinely still active. @@ -1472,7 +1475,7 @@ private async Task RunProcessingWatchdogAsync(SessionState state, string session var lastEventTicks = Interlocked.Read(ref state.LastEventAtTicks); var elapsed = (DateTime.UtcNow - new DateTime(lastEventTicks)).TotalSeconds; - var hasActiveTool = Interlocked.CompareExchange(ref state.ActiveToolCallCount, 0, 0) > 0; + var hasActiveTool = Volatile.Read(ref state.ActiveToolCallCount) > 0; // After events have started flowing on a resumed session, clear IsResumed // so the watchdog transitions from the long 600s timeout to the shorter 120s. @@ -1533,12 +1536,15 @@ private async Task RunProcessingWatchdogAsync(SessionState state, string session }); } - var useToolTimeout = hasActiveTool || (state.Info.IsResumed && !useResumeQuiescence) || hasUsedTools; + var useToolTimeout = hasActiveTool || (state.Info.IsResumed && !useResumeQuiescence); + var useUsedToolsTimeout = !useToolTimeout && hasUsedTools && !hasActiveTool; var effectiveTimeout = useResumeQuiescence ? WatchdogResumeQuiescenceTimeoutSeconds : useToolTimeout ? WatchdogToolExecutionTimeoutSeconds - : WatchdogInactivityTimeoutSeconds; + : useUsedToolsTimeout + ? WatchdogUsedToolsIdleTimeoutSeconds + : WatchdogInactivityTimeoutSeconds; // Safety net: check absolute max processing time, but only if events have also // gone stale. If events are still flowing (elapsed < effectiveTimeout), the session diff --git a/PolyPilot/Services/CopilotService.Persistence.cs b/PolyPilot/Services/CopilotService.Persistence.cs index 2624aaf15b..efb9917536 100644 --- a/PolyPilot/Services/CopilotService.Persistence.cs +++ b/PolyPilot/Services/CopilotService.Persistence.cs @@ -392,8 +392,12 @@ public async Task RestorePreviousSessionsAsync(CancellationToken cancellationTok // "Session not found" means the CLI server doesn't know this session // (e.g., worker sessions that were created but never received a message). + // "corrupted" / "session file" errors mean the events.jsonl is locked or + // unreadable (e.g., another copilot process owns the session). // Fall back to creating a fresh session so multi-agent workers don't vanish. - if (ex.Message.Contains("Session not found", StringComparison.OrdinalIgnoreCase)) + if (ex.Message.Contains("Session not found", StringComparison.OrdinalIgnoreCase) || + ex.Message.Contains("corrupt", StringComparison.OrdinalIgnoreCase) || + ex.Message.Contains("session file", StringComparison.OrdinalIgnoreCase)) { try { diff --git a/PolyPilot/Services/CopilotService.cs b/PolyPilot/Services/CopilotService.cs index b8e637a995..d42f6208be 100644 --- a/PolyPilot/Services/CopilotService.cs +++ b/PolyPilot/Services/CopilotService.cs @@ -53,6 +53,9 @@ public partial class CopilotService : IAsyncDisposable private ConnectionSettings? _currentSettings; private string? _activeSessionName; private SynchronizationContext? _syncContext; + // Serializes the IsConnectionError reconnect path so concurrent workers + // don't destroy each other's freshly-created client (thundering herd fix). + private readonly SemaphoreSlim _clientReconnectLock = new(1, 1); private static readonly object _pathLock = new(); private static string? _copilotBaseDir; @@ -2493,47 +2496,67 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis if (client == null) throw new InvalidOperationException("Client is not initialized"); - // If the underlying connection is broken, recreate the client first + // If the underlying connection is broken, recreate the client first. + // Serialize via _clientReconnectLock so concurrent workers don't each + // dispose+recreate _client (thundering herd — only the first one reconnects). if (IsConnectionError(ex)) { - Debug("Connection error detected, recreating client before session reconnect..."); - var connSettings = _currentSettings ?? ConnectionSettings.Load(); - if (CurrentMode == ConnectionMode.Persistent && - !_serverManager.CheckServerRunning("127.0.0.1", connSettings.Port)) + Debug($"Connection error detected for '{sessionName}', acquiring reconnect lock..."); + await _clientReconnectLock.WaitAsync(cancellationToken); + try { - Debug("Persistent server not running, restarting..."); - var started = await _serverManager.StartServerAsync(connSettings.Port); - if (!started) + // Double-check: another worker may have already reconnected while we waited. + if (_client != null && !IsConnectionError(ex)) + { + Debug($"Client already reconnected by another worker, skipping recreate for '{sessionName}'"); + client = _client; + } + else { - Debug("Failed to restart persistent server"); + Debug("Recreating client after connection error..."); + var connSettings = _currentSettings ?? ConnectionSettings.Load(); + if (CurrentMode == ConnectionMode.Persistent && + !_serverManager.CheckServerRunning("127.0.0.1", connSettings.Port)) + { + Debug("Persistent server not running, restarting..."); + var started = await _serverManager.StartServerAsync(connSettings.Port); + if (!started) + { + Debug("Failed to restart persistent server"); + try { await _client.DisposeAsync(); } catch { } + _client = null; + IsInitialized = false; + throw; + } + } try { await _client.DisposeAsync(); } catch { } - _client = null; - IsInitialized = false; - throw; + try + { + _client = CreateClient(connSettings); + await _client.StartAsync(cancellationToken); + client = _client; + Debug("Client recreated successfully"); + } + catch (OperationCanceledException) + { + try { if (_client != null) await _client.DisposeAsync(); } catch { } + _client = null; + IsInitialized = false; + throw; + } + catch (Exception clientEx) + { + Debug($"Failed to recreate client: {clientEx.Message}"); + try { if (_client != null) await _client.DisposeAsync(); } catch { } + _client = null; + IsInitialized = false; + throw; + } } } - try { await _client.DisposeAsync(); } catch { } - try - { - _client = CreateClient(connSettings); - await _client.StartAsync(cancellationToken); - client = _client; // Update local reference to the new client - Debug("Client recreated successfully"); - } - catch (OperationCanceledException) - { - try { if (_client != null) await _client.DisposeAsync(); } catch { } - _client = null; - IsInitialized = false; - throw; - } - catch (Exception clientEx) + finally { - Debug($"Failed to recreate client: {clientEx.Message}"); - try { if (_client != null) await _client.DisposeAsync(); } catch { } - _client = null; - IsInitialized = false; - throw; + _clientReconnectLock.Release(); } } From 6e305704a65dcfe17aa74ee020b015b354667446 Mon Sep 17 00:00:00 2001 From: Shane Date: Tue, 10 Mar 2026 09:38:11 -0500 Subject: [PATCH 3/6] feat: enhanced text-parsing dispatch with reliability fixes (#229) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Text-Parsing Enhancements: - JSON mode parsing: orchestrator can return [{worker,task}] array, parsed with System.Text.Json. Falls back to @worker:...@end regex on parse failure. - Exact match only for worker names — removed bidirectional Contains fallback that caused misroutes when names are substrings of each other. - Strip backticks/quotes from worker names for robustness. - Differentiated task instruction: 'Each worker MUST receive a DIFFERENT sub-task' - Retry loop (up to 3 iterations) with conversation history when not all workers dispatched. Model remembers what it already assigned (no fresh sessions/amnesia). Reliability Fixes (ported from tool-dispatch branch): - BuildFreshSessionConfig helper: MCP servers, skills, system message in one place. Applied to reconnect handler and worker revival. - CTS-to-TCS wiring: 10-minute timeout in SendPromptAndWaitAsync actually cancels the ResponseCompletion TCS instead of being a no-op. - Reset HasUsedToolsThisTurn before reconnect retry to prevent 600s zombie timeout. - Worker revival: detect empty response in ExecuteWorkerAsync, create fresh session with BuildFreshSessionConfig, retry once (~20 lines vs ~70 in tool-dispatch). Tests: - 6 new JSON parsing tests (array, code-fenced, unknown worker, malformed, empty) - Updated fuzzy match tests to verify exact-match-only behavior - Updated ConnectionRecovery + ChatExperienceSafety for BuildFreshSessionConfig Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- PolyPilot.Tests/ChatExperienceSafetyTests.cs | 14 +- PolyPilot.Tests/ConnectionRecoveryTests.cs | 54 +++--- PolyPilot.Tests/MultiAgentGapTests.cs | 54 +++++- PolyPilot.Tests/SessionOrganizationTests.cs | 6 +- .../Services/CopilotService.Organization.cs | 178 +++++++++++++++--- PolyPilot/Services/CopilotService.cs | 95 ++++++---- 6 files changed, 290 insertions(+), 111 deletions(-) diff --git a/PolyPilot.Tests/ChatExperienceSafetyTests.cs b/PolyPilot.Tests/ChatExperienceSafetyTests.cs index 5f8deaac2b..0f6889b8ca 100644 --- a/PolyPilot.Tests/ChatExperienceSafetyTests.cs +++ b/PolyPilot.Tests/ChatExperienceSafetyTests.cs @@ -842,12 +842,18 @@ public void ReconnectPath_IncludesMcpServersAndSkills() var source = File.ReadAllText( Path.Combine(GetRepoRoot(), "PolyPilot", "Services", "CopilotService.cs")); + // After extraction to BuildFreshSessionConfig, verify the reconnect path calls the helper var sessionNotFoundIdx = source.IndexOf("Session not found", StringComparison.OrdinalIgnoreCase); Assert.True(sessionNotFoundIdx > 0); - - var afterNotFound = source.Substring(sessionNotFoundIdx, Math.Min(2000, source.Length - sessionNotFoundIdx)); - Assert.Contains("McpServers", afterNotFound); - Assert.Contains("SkillDirectories", afterNotFound); + var afterNotFound = source.Substring(sessionNotFoundIdx, Math.Min(1000, source.Length - sessionNotFoundIdx)); + Assert.Contains("BuildFreshSessionConfig", afterNotFound); + + // And verify the helper itself includes MCP and Skills + var helperIdx = source.IndexOf("BuildFreshSessionConfig(SessionState state"); + Assert.True(helperIdx > 0); + var helperBlock = source.Substring(helperIdx, Math.Min(2000, source.Length - helperIdx)); + Assert.Contains("McpServers", helperBlock); + Assert.Contains("SkillDirectories", helperBlock); } // ========================================================================= diff --git a/PolyPilot.Tests/ConnectionRecoveryTests.cs b/PolyPilot.Tests/ConnectionRecoveryTests.cs index 2aecf4bb79..8e3761e2c4 100644 --- a/PolyPilot.Tests/ConnectionRecoveryTests.cs +++ b/PolyPilot.Tests/ConnectionRecoveryTests.cs @@ -276,16 +276,20 @@ public void SendPromptAsync_FreshSessionConfig_IncludesMcpServers() { // STRUCTURAL REGRESSION GUARD: The "Session not found" fallback must assign // McpServers in the freshConfig so MCP tools survive reconnection. + // After extraction to BuildFreshSessionConfig helper, verify the helper contains it. var source = File.ReadAllText(Path.Combine(GetRepoRoot(), "PolyPilot", "Services", "CopilotService.cs")); - // Anchor on the freshConfig initializer inside the "Session not found" reconnect path - var freshConfigIndex = source.IndexOf("freshConfig = new SessionConfig"); - Assert.True(freshConfigIndex > 0, "Could not find freshConfig in reconnect path"); + // Verify the reconnect path calls the helper + var sessionNotFoundIdx = source.IndexOf("Session not found", StringComparison.OrdinalIgnoreCase); + Assert.True(sessionNotFoundIdx > 0, "Could not find 'Session not found' in reconnect path"); + var afterNotFound = source.Substring(sessionNotFoundIdx, Math.Min(1000, source.Length - sessionNotFoundIdx)); + Assert.Contains("BuildFreshSessionConfig", afterNotFound); - // Extract the config block (generously sized to cover all fields) - var endIndex = Math.Min(freshConfigIndex + 600, source.Length); - var configBlock = source.Substring(freshConfigIndex, endIndex - freshConfigIndex); - Assert.Contains("McpServers = ", configBlock); + // Verify the helper body includes McpServers + var helperIdx = source.IndexOf("BuildFreshSessionConfig(SessionState state"); + Assert.True(helperIdx > 0, "Could not find BuildFreshSessionConfig helper"); + var helperBlock = source.Substring(helperIdx, Math.Min(2000, source.Length - helperIdx)); + Assert.Contains("McpServers = ", helperBlock); } [Fact] @@ -295,12 +299,10 @@ public void SendPromptAsync_FreshSessionConfig_IncludesSkillDirectories() // SkillDirectories in the freshConfig so skills survive reconnection. var source = File.ReadAllText(Path.Combine(GetRepoRoot(), "PolyPilot", "Services", "CopilotService.cs")); - var freshConfigIndex = source.IndexOf("freshConfig = new SessionConfig"); - Assert.True(freshConfigIndex > 0, "Could not find freshConfig in reconnect path"); - - var endIndex = Math.Min(freshConfigIndex + 600, source.Length); - var configBlock = source.Substring(freshConfigIndex, endIndex - freshConfigIndex); - Assert.Contains("SkillDirectories = ", configBlock); + var helperIdx = source.IndexOf("BuildFreshSessionConfig(SessionState state"); + Assert.True(helperIdx > 0, "Could not find BuildFreshSessionConfig helper"); + var helperBlock = source.Substring(helperIdx, Math.Min(2000, source.Length - helperIdx)); + Assert.Contains("SkillDirectories = ", helperBlock); } [Fact] @@ -310,31 +312,25 @@ public void SendPromptAsync_FreshSessionConfig_IncludesSystemMessage() // SystemMessage so the session retains its system prompt after reconnection. var source = File.ReadAllText(Path.Combine(GetRepoRoot(), "PolyPilot", "Services", "CopilotService.cs")); - var freshConfigIndex = source.IndexOf("freshConfig = new SessionConfig"); - Assert.True(freshConfigIndex > 0, "Could not find freshConfig in reconnect path"); - - var endIndex = Math.Min(freshConfigIndex + 600, source.Length); - var configBlock = source.Substring(freshConfigIndex, endIndex - freshConfigIndex); - Assert.Contains("SystemMessage = ", configBlock); - Assert.Contains("SystemMessageMode.Append", configBlock); + var helperIdx = source.IndexOf("BuildFreshSessionConfig(SessionState state"); + Assert.True(helperIdx > 0, "Could not find BuildFreshSessionConfig helper"); + var helperBlock = source.Substring(helperIdx, Math.Min(2000, source.Length - helperIdx)); + Assert.Contains("SystemMessage = ", helperBlock); + Assert.Contains("SystemMessageMode.Append", helperBlock); } [Fact] public void SendPromptAsync_FreshSessionConfig_MatchesCreateSessionFields() { - // STRUCTURAL REGRESSION GUARD: The freshConfig in the reconnect path must + // STRUCTURAL REGRESSION GUARD: The BuildFreshSessionConfig helper must // set the same critical fields as the original CreateSessionAsync config. - // This prevents "environment keeps going away" after connection loss. var source = File.ReadAllText(Path.Combine(GetRepoRoot(), "PolyPilot", "Services", "CopilotService.cs")); - var freshConfigIndex = source.IndexOf("freshConfig = new SessionConfig"); - Assert.True(freshConfigIndex > 0); + var helperIdx = source.IndexOf("BuildFreshSessionConfig(SessionState state"); + Assert.True(helperIdx > 0); - // Extract the full config initializer block - var endIndex = Math.Min(freshConfigIndex + 800, source.Length); - var configBlock = source.Substring(freshConfigIndex, endIndex - freshConfigIndex); + var helperBlock = source.Substring(helperIdx, Math.Min(2000, source.Length - helperIdx)); - // All critical SessionConfig property assignments must be present var requiredAssignments = new[] { "Model = ", "WorkingDirectory = ", "McpServers = ", "SkillDirectories = ", @@ -342,7 +338,7 @@ public void SendPromptAsync_FreshSessionConfig_MatchesCreateSessionFields() }; foreach (var assignment in requiredAssignments) { - Assert.Contains(assignment, configBlock); + Assert.Contains(assignment, helperBlock); } } diff --git a/PolyPilot.Tests/MultiAgentGapTests.cs b/PolyPilot.Tests/MultiAgentGapTests.cs index f24db3400a..425b6b865a 100644 --- a/PolyPilot.Tests/MultiAgentGapTests.cs +++ b/PolyPilot.Tests/MultiAgentGapTests.cs @@ -50,14 +50,13 @@ Task three. } [Fact] - public void ParseTaskAssignments_FuzzyMatch_FindsClosestWorker() + public void ParseTaskAssignments_ExactMatchOnly_RejectsSubstring() { - // "coder" is a substring of "coder-session" → fuzzy match + // With exact-match-only, "coder" does NOT match "coder-session" var response = "@worker:coder\nWrite the code.\n@end"; var result = CopilotService.ParseTaskAssignments(response, new List { "coder-session", "reviewer-session" }); - Assert.Single(result); - Assert.Equal("coder-session", result[0].WorkerName); + Assert.Empty(result); // No exact match } [Fact] @@ -282,6 +281,53 @@ public void ParseTaskAssignments_CaseInsensitiveWorker_Resolves() Assert.Equal("team-worker-1", result[0].WorkerName); } + // --- JSON Parsing Tests --- + + [Fact] + public void ParseTaskAssignments_JsonArray_ParsesCorrectly() + { + var response = """[{"worker":"alpha","task":"Do task A"},{"worker":"beta","task":"Do task B"}]"""; + var result = CopilotService.ParseTaskAssignments(response, new List { "alpha", "beta" }); + Assert.Equal(2, result.Count); + Assert.Equal("alpha", result[0].WorkerName); + Assert.Equal("Do task A", result[0].Task); + Assert.Equal("beta", result[1].WorkerName); + } + + [Fact] + public void ParseTaskAssignments_JsonInCodeFence_ParsesCorrectly() + { + var response = "```json\n[{\"worker\":\"alpha\",\"task\":\"Do task A\"}]\n```"; + var result = CopilotService.ParseTaskAssignments(response, new List { "alpha", "beta" }); + Assert.Single(result); + Assert.Equal("alpha", result[0].WorkerName); + } + + [Fact] + public void ParseTaskAssignments_JsonWithUnknownWorker_SkipsUnmatched() + { + var response = """[{"worker":"alpha","task":"Do A"},{"worker":"ghost","task":"Do G"}]"""; + var result = CopilotService.ParseTaskAssignments(response, new List { "alpha", "beta" }); + Assert.Single(result); + Assert.Equal("alpha", result[0].WorkerName); + } + + [Fact] + public void ParseTaskAssignments_MalformedJson_FallsBackToRegex() + { + var response = "[broken json\n@worker:alpha\nDo task A.\n@end"; + var result = CopilotService.ParseTaskAssignments(response, new List { "alpha" }); + Assert.Single(result); + Assert.Equal("alpha", result[0].WorkerName); + } + + [Fact] + public void TryParseJsonAssignments_EmptyArray_ReturnsEmpty() + { + var result = CopilotService.TryParseJsonAssignments("[]", new List { "alpha" }); + Assert.Empty(result); + } + // --- BuildDelegationNudgePrompt --- [Fact] diff --git a/PolyPilot.Tests/SessionOrganizationTests.cs b/PolyPilot.Tests/SessionOrganizationTests.cs index 5920528ce7..2e1e5df14d 100644 --- a/PolyPilot.Tests/SessionOrganizationTests.cs +++ b/PolyPilot.Tests/SessionOrganizationTests.cs @@ -692,8 +692,9 @@ Implement the login form with email and password fields. } [Fact] - public void ParseTaskAssignments_FuzzyMatchesWorkerNames() + public void ParseTaskAssignments_ExactMatchOnly_NoFuzzy() { + // With exact-match-only, "session" does NOT match "session-alpha" var response = @"@worker:session Do the work. @end"; @@ -701,8 +702,7 @@ Do the work. var workers = new List { "session-alpha", "session-beta" }; var assignments = CopilotService.ParseTaskAssignments(response, workers); - Assert.Single(assignments); - Assert.Equal("session-alpha", assignments[0].WorkerName); + Assert.Empty(assignments); // No exact match for "session" } [Fact] diff --git a/PolyPilot/Services/CopilotService.Organization.cs b/PolyPilot/Services/CopilotService.Organization.cs index 67086757be..74197c7867 100644 --- a/PolyPilot/Services/CopilotService.Organization.cs +++ b/PolyPilot/Services/CopilotService.Organization.cs @@ -1044,35 +1044,80 @@ private async Task SendViaOrchestratorAsync(string groupId, List members var planningPrompt = BuildOrchestratorPlanningPrompt(prompt, workerNames, group?.OrchestratorPrompt, group?.RoutingContext); var planResponse = await SendPromptAndWaitAsync(orchestratorName, planningPrompt, cancellationToken, originalPrompt: prompt); - // Phase 2: Parse task assignments from orchestrator response - var rawAssignments = ParseTaskAssignments(planResponse, workerNames); - Debug($"[DISPATCH] '{orchestratorName}' plan parsed: {rawAssignments.Count} raw assignments from {workerNames.Count} workers. Response length={planResponse.Length}"); - // Deduplicate: merge multiple tasks for the same worker into one prompt - var assignments = rawAssignments - .GroupBy(a => a.WorkerName, StringComparer.OrdinalIgnoreCase) - .Select(g => new TaskAssignment(g.Key, string.Join("\n\n---\n\n", g.Select(a => a.Task)))) - .ToList(); - if (assignments.Count == 0) - { - // Send a nudge prompt to force delegation before giving up - Debug($"[DISPATCH] No assignments parsed (length={planResponse.Length}). Sending delegation nudge. Workers: {string.Join(", ", workerNames)}"); - var nudgePrompt = BuildDelegationNudgePrompt(workerNames); - var nudgeResponse = await SendPromptAndWaitAsync(orchestratorName, nudgePrompt, cancellationToken, originalPrompt: prompt); - rawAssignments = ParseTaskAssignments(nudgeResponse, workerNames); - Debug($"[DISPATCH] '{orchestratorName}' nudge parsed: {rawAssignments.Count} raw assignments. Response length={nudgeResponse.Length}"); - assignments = rawAssignments + // Phase 2: Parse task assignments from orchestrator response, with retry loop. + // Uses conversation history (no fresh sessions) so the model remembers what it already assigned. + var allAssignments = new List(); + var dispatchedWorkers = new HashSet(StringComparer.OrdinalIgnoreCase); + const int maxDispatchIterations = 3; + + for (int dispatchIter = 0; dispatchIter < maxDispatchIterations; dispatchIter++) + { + string responseToParse; + if (dispatchIter == 0) + { + // First iteration: parse the initial planning response + responseToParse = planResponse; + } + else + { + // Subsequent iterations: nudge with context about who's already dispatched + var remaining = workerNames.Where(w => !dispatchedWorkers.Contains(w)).ToList(); + if (remaining.Count == 0) break; + + var nudgeSb = new System.Text.StringBuilder(); + nudgeSb.AppendLine($"You dispatched {dispatchedWorkers.Count} worker(s): {string.Join(", ", dispatchedWorkers)}"); + nudgeSb.AppendLine($"{remaining.Count} worker(s) remain: {string.Join(", ", remaining)}"); + nudgeSb.AppendLine(); + nudgeSb.AppendLine("You MUST dispatch at least one more worker. Each worker MUST receive a DIFFERENT sub-task from what you already assigned."); + nudgeSb.AppendLine("Use @worker:name...@end format."); + responseToParse = await SendPromptAndWaitAsync(orchestratorName, nudgeSb.ToString(), cancellationToken, originalPrompt: prompt); + } + + var rawAssignments = ParseTaskAssignments(responseToParse, workerNames); + Debug($"[DISPATCH] '{orchestratorName}' iteration {dispatchIter}: {rawAssignments.Count} raw assignments. Response length={responseToParse.Length}"); + + // Deduplicate: merge multiple tasks for the same worker + var iterAssignments = rawAssignments .GroupBy(a => a.WorkerName, StringComparer.OrdinalIgnoreCase) .Select(g => new TaskAssignment(g.Key, string.Join("\n\n---\n\n", g.Select(a => a.Task)))) + .Where(a => !dispatchedWorkers.Contains(a.WorkerName)) // skip already-dispatched .ToList(); - if (assignments.Count == 0) + + if (iterAssignments.Count == 0 && dispatchIter == 0) { - Debug($"[DISPATCH] Nudge also produced no assignments. Workers: {string.Join(", ", workerNames)}"); - AddOrchestratorSystemMessage(orchestratorName, "ℹ️ Orchestrator handled the request directly (no tasks delegated to workers)."); - InvokeOnUI(() => OnOrchestratorPhaseChanged?.Invoke(groupId, OrchestratorPhase.Complete, null)); - return; + // First pass produced nothing — send a single nudge (backwards compat) + Debug($"[DISPATCH] No assignments parsed. Sending delegation nudge."); + var nudgePrompt = BuildDelegationNudgePrompt(workerNames); + var nudgeResponse = await SendPromptAndWaitAsync(orchestratorName, nudgePrompt, cancellationToken, originalPrompt: prompt); + iterAssignments = ParseTaskAssignments(nudgeResponse, workerNames) + .GroupBy(a => a.WorkerName, StringComparer.OrdinalIgnoreCase) + .Select(g => new TaskAssignment(g.Key, string.Join("\n\n---\n\n", g.Select(a => a.Task)))) + .ToList(); + Debug($"[DISPATCH] Nudge parsed: {iterAssignments.Count} assignments."); + } + + if (iterAssignments.Count == 0) + { + if (allAssignments.Count == 0) + { + Debug($"[DISPATCH] No assignments after iteration {dispatchIter}. Orchestrator handled directly."); + AddOrchestratorSystemMessage(orchestratorName, "ℹ️ Orchestrator handled the request directly (no tasks delegated to workers)."); + InvokeOnUI(() => OnOrchestratorPhaseChanged?.Invoke(groupId, OrchestratorPhase.Complete, null)); + return; + } + break; // Have some assignments, proceed to dispatch } + + allAssignments.AddRange(iterAssignments); + foreach (var a in iterAssignments) + dispatchedWorkers.Add(a.WorkerName); + + // If all workers are assigned or this is the last iteration, stop + if (dispatchedWorkers.Count >= workerNames.Count) break; } + var assignments = allAssignments; + // Phase 3: Dispatch tasks to workers in parallel Debug($"[DISPATCH] Dispatching {assignments.Count} tasks: {string.Join(", ", assignments.Select(a => a.WorkerName))}"); InvokeOnUI(() => OnOrchestratorPhaseChanged?.Invoke(groupId, OrchestratorPhase.Dispatching, @@ -1155,6 +1200,10 @@ private string BuildOrchestratorPlanningPrompt(string userPrompt, List w { sb.AppendLine("Break the request into tasks and assign them to workers. You may also do some coordination work yourself (e.g., verifying results, running commands)."); } + sb.AppendLine(); + sb.AppendLine("IMPORTANT: Each worker MUST receive a DIFFERENT sub-task. Do NOT assign the same work to two workers."); + sb.AppendLine("If the request has fewer sub-tasks than workers, only assign to the workers you need."); + sb.AppendLine(); sb.AppendLine("Use this exact format for each assignment:"); sb.AppendLine(); sb.AppendLine("@worker:worker-name"); @@ -1193,30 +1242,65 @@ internal static string BuildDelegationNudgePrompt(List workerNames) internal static List ParseTaskAssignments(string orchestratorResponse, List availableWorkers) { + // Try JSON parsing first — more reliable than regex + var jsonAssignments = TryParseJsonAssignments(orchestratorResponse, availableWorkers); + if (jsonAssignments.Count > 0) + return jsonAssignments; + + // Fall back to @worker:name...@end regex parsing var assignments = new List(); var pattern = @"@worker:([^\n]+?)\s*\n([\s\S]*?)(?:@end|(?=@worker:)|$)"; foreach (Match match in Regex.Matches(orchestratorResponse, pattern, RegexOptions.IgnoreCase)) { - var workerName = match.Groups[1].Value.Trim(); + var workerName = match.Groups[1].Value.Trim().Trim('`', '\'', '"'); var task = match.Groups[2].Value.Trim(); if (string.IsNullOrEmpty(task)) continue; - // Resolve worker name: exact match, then fuzzy + // Exact match only — no fuzzy bidirectional Contains (caused misroutes) var resolved = availableWorkers.FirstOrDefault(w => w.Equals(workerName, StringComparison.OrdinalIgnoreCase)); - if (resolved == null) - { - resolved = availableWorkers.FirstOrDefault(w => - w.Contains(workerName, StringComparison.OrdinalIgnoreCase) || - workerName.Contains(w, StringComparison.OrdinalIgnoreCase)); - } if (resolved != null) assignments.Add(new TaskAssignment(resolved, task)); } return assignments; } + /// + /// Try to parse orchestrator response as JSON array of worker assignments. + /// Accepts: [{"worker":"name","task":"..."},...] with optional markdown code fences. + /// + internal static List TryParseJsonAssignments(string response, List availableWorkers) + { + var assignments = new List(); + try + { + // Strip markdown code fences if present + var json = response.Trim(); + var fenceMatch = Regex.Match(json, @"```(?:json)?\s*\n?([\s\S]*?)\n?```", RegexOptions.IgnoreCase); + if (fenceMatch.Success) + json = fenceMatch.Groups[1].Value.Trim(); + + // Must start with [ to be a JSON array + if (!json.StartsWith("[")) return assignments; + + using var doc = System.Text.Json.JsonDocument.Parse(json); + foreach (var element in doc.RootElement.EnumerateArray()) + { + var workerName = element.TryGetProperty("worker", out var w) ? w.GetString() : null; + var task = element.TryGetProperty("task", out var t) ? t.GetString() : null; + if (string.IsNullOrEmpty(workerName) || string.IsNullOrEmpty(task)) continue; + + var resolved = availableWorkers.FirstOrDefault(wk => + wk.Equals(workerName, StringComparison.OrdinalIgnoreCase)); + if (resolved != null) + assignments.Add(new TaskAssignment(resolved, task)); + } + } + catch { /* Not valid JSON — fall through to regex */ } + return assignments; + } + private record WorkerResult(string WorkerName, string? Response, bool Success, string? Error, TimeSpan Duration); private async Task ExecuteWorkerAsync(string workerName, string task, string originalPrompt, CancellationToken cancellationToken) @@ -1253,7 +1337,32 @@ private async Task ExecuteWorkerAsync(string workerName, string ta { Debug($"[DISPATCH] Worker '{workerName}' starting (prompt len={workerPrompt.Length})"); var response = await SendPromptAndWaitAsync(workerName, workerPrompt, cancellationToken, originalPrompt: originalPrompt); - Debug($"[DISPATCH] Worker '{workerName}' completed (response len={response.Length}, elapsed={sw.Elapsed.TotalSeconds:F1}s)"); + + // Worker revival: empty response means the session died (e.g., dead SSE stream + // after reconnect). Create a fresh session and retry once. + if (string.IsNullOrWhiteSpace(response)) + { + Debug($"[DISPATCH] Worker '{workerName}' returned empty — attempting fresh session revival"); + if (_sessions.TryGetValue(workerName, out var deadState)) + { + try { await deadState.Session.DisposeAsync(); } catch { } + + var workerMeta = GetSessionMeta(workerName); + var client = GetClientForGroup(workerMeta?.GroupId); + if (client != null) + { + var freshConfig = BuildFreshSessionConfig(deadState); + var freshSession = await client.CreateSessionAsync(freshConfig, cancellationToken); + deadState.Info.SessionId = freshSession.SessionId; + var freshState = new SessionState { Session = freshSession, Info = deadState.Info }; + _sessions[workerName] = freshState; + Debug($"[DISPATCH] Worker '{workerName}' revived with fresh session '{freshSession.SessionId}'"); + response = await SendPromptAndWaitAsync(workerName, workerPrompt, cancellationToken, originalPrompt: originalPrompt); + } + } + } + + Debug($"[DISPATCH] Worker '{workerName}' completed (response len={response?.Length ?? 0}, elapsed={sw.Elapsed.TotalSeconds:F1}s)"); return new WorkerResult(workerName, response, true, null, sw.Elapsed); } catch (Exception ex) @@ -1270,6 +1379,13 @@ private async Task SendPromptAndWaitAsync(string sessionName, string pro // object, orphaning the old TCS and causing a 10-minute hang. using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(TimeSpan.FromMinutes(10)); + // Wire CTS to ResponseCompletion TCS so the 10-minute timeout actually cancels the await. + // Must look up from _sessions dict (not captured ref) since reconnect replaces state. + await using var ctsReg = cts.Token.Register(() => + { + if (_sessions.TryGetValue(sessionName, out var s)) + s.ResponseCompletion?.TrySetCanceled(); + }); return await SendPromptAsync(sessionName, prompt, cancellationToken: cts.Token, originalPrompt: originalPrompt); } diff --git a/PolyPilot/Services/CopilotService.cs b/PolyPilot/Services/CopilotService.cs index d42f6208be..c90c58584f 100644 --- a/PolyPilot/Services/CopilotService.cs +++ b/PolyPilot/Services/CopilotService.cs @@ -2580,46 +2580,7 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis // with full config (MCP servers, skills, system message) matching CreateSessionAsync. Debug($"Session '{sessionName}' expired on server, creating fresh session..."); OnActivity?.Invoke(sessionName, "🔄 Session expired, creating new session..."); - var freshSettings = _currentSettings ?? ConnectionSettings.Load(); - var freshMcpServers = LoadMcpServers(freshSettings.DisabledMcpServers, freshSettings.DisabledPlugins); - var freshSkillDirs = LoadSkillDirectories(freshSettings.DisabledPlugins); - // Rebuild system message with the same conditional logic as CreateSessionAsync - var freshSystemContent = new StringBuilder(); - var freshDir = state.Info.WorkingDirectory; - if (string.Equals(freshDir, ProjectDir, StringComparison.OrdinalIgnoreCase)) - { - var relaunchCmd = OperatingSystem.IsWindows() - ? $"powershell -ExecutionPolicy Bypass -File \"{Path.Combine(ProjectDir, "relaunch.ps1")}\"" - : $"bash {Path.Combine(ProjectDir, "relaunch.sh")}"; - freshSystemContent.AppendLine($@" -CRITICAL BUILD INSTRUCTION: You are running inside the PolyPilot MAUI application. -When you make ANY code changes to files in {ProjectDir}, you MUST rebuild and relaunch by running: - - {relaunchCmd} - -This script builds the app, launches a new instance, waits for it to start, then kills the old one. -NEVER use 'dotnet build' + 'open' separately. NEVER skip the relaunch after code changes. -ALWAYS run the relaunch script as the final step after making changes to this project. -"); - } - var freshConfig = new SessionConfig - { - Model = reconnectModel ?? DefaultModel, - WorkingDirectory = freshDir, - McpServers = freshMcpServers, - SkillDirectories = freshSkillDirs, - Tools = new List { ShowImageTool.CreateFunction() }, - SystemMessage = new SystemMessageConfig - { - Mode = SystemMessageMode.Append, - Content = freshSystemContent.ToString() - }, - OnPermissionRequest = AutoApprovePermissions - }; - if (freshMcpServers != null) - Debug($"[RECONNECT] Fresh session config includes {freshMcpServers.Count} MCP server(s)"); - if (freshSkillDirs != null) - Debug($"[RECONNECT] Fresh session config includes {freshSkillDirs.Count} skill dir(s)"); + var freshConfig = BuildFreshSessionConfig(state); newSession = await client.CreateSessionAsync(freshConfig, cancellationToken); state.Info.SessionId = newSession.SessionId; } @@ -2656,6 +2617,10 @@ ALWAYS run the relaunch script as the final step after making changes to this pr state.PendingReasoningMessages.Clear(); Debug($"[RECONNECT] '{sessionName}' reset processing state: gen={Interlocked.Read(ref state.ProcessingGeneration)}"); + // Reset HasUsedToolsThisTurn so the retried turn starts with the default + // 120s watchdog tier instead of the inflated 600s from stale tool state. + Volatile.Write(ref state.HasUsedToolsThisTurn, false); + // Start fresh watchdog for the new connection StartProcessingWatchdog(state, sessionName); @@ -2731,6 +2696,56 @@ ALWAYS run the relaunch script as the final step after making changes to this pr } } + /// + /// Build a fresh SessionConfig with MCP servers, skill directories, and system message. + /// Mirrors the reconnect handler's "Session not found" path to ensure revived/fresh sessions + /// have full external tool access. + /// + private SessionConfig BuildFreshSessionConfig(SessionState state, List? tools = null) + { + var settings = _currentSettings ?? ConnectionSettings.Load(); + var mcpServers = LoadMcpServers(settings.DisabledMcpServers, settings.DisabledPlugins); + var skillDirs = LoadSkillDirectories(settings.DisabledPlugins); + var systemContent = new StringBuilder(); + var workDir = state.Info.WorkingDirectory; + if (string.Equals(workDir, ProjectDir, StringComparison.OrdinalIgnoreCase)) + { + var relaunchCmd = OperatingSystem.IsWindows() + ? $"powershell -ExecutionPolicy Bypass -File \"{Path.Combine(ProjectDir, "relaunch.ps1")}\"" + : $"bash {Path.Combine(ProjectDir, "relaunch.sh")}"; + systemContent.AppendLine($@" +CRITICAL BUILD INSTRUCTION: You are running inside the PolyPilot MAUI application. +When you make ANY code changes to files in {ProjectDir}, you MUST rebuild and relaunch by running: + + {relaunchCmd} + +This script builds the app, launches a new instance, waits for it to start, then kills the old one. +NEVER use 'dotnet build' + 'open' separately. NEVER skip the relaunch after code changes. +ALWAYS run the relaunch script as the final step after making changes to this project. +"); + } + var finalTools = tools ?? new List { ShowImageTool.CreateFunction() }; + var config = new SessionConfig + { + Model = Models.ModelHelper.NormalizeToSlug(state.Info.Model) ?? DefaultModel, + WorkingDirectory = workDir, + McpServers = mcpServers, + SkillDirectories = skillDirs, + Tools = finalTools, + SystemMessage = new SystemMessageConfig + { + Mode = SystemMessageMode.Append, + Content = systemContent.ToString() + }, + OnPermissionRequest = AutoApprovePermissions + }; + if (mcpServers != null) + Debug($"[FRESH-CONFIG] Includes {mcpServers.Count} MCP server(s)"); + if (skillDirs != null) + Debug($"[FRESH-CONFIG] Includes {skillDirs.Count} skill dir(s)"); + return config; + } + public async Task AbortSessionAsync(string sessionName, bool markAsInterrupted = false) { // Provider sessions manage their own cancellation From bbdb11ed3ec26eb4ea982f92d5375c7d2f95d1a7 Mon Sep 17 00:00:00 2001 From: Shane Date: Tue, 10 Mar 2026 10:35:42 -0500 Subject: [PATCH 4/6] fix: batch worker dispatch and concurrent dispatch guard - Update planning prompt to instruct model to assign ALL workers in a single response instead of 'at least one'. This fixes the issue where the model would assign only 1 worker per iteration, requiring 3 iterations to get 3 workers assigned (and missing workers 4-5). - Update nudge prompt to request ALL remaining workers at once. - Add per-group SemaphoreSlim (_groupDispatchLocks) to prevent concurrent dispatches to the same group. The bridge's send_message handler and the event queue drain can both call SendToMultiAgentGroupAsync; without this guard, the second call hits 'Session already processing' error. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Services/CopilotService.Organization.cs | 26 ++++++++++++++++--- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/PolyPilot/Services/CopilotService.Organization.cs b/PolyPilot/Services/CopilotService.Organization.cs index 74197c7867..2fb4dffac9 100644 --- a/PolyPilot/Services/CopilotService.Organization.cs +++ b/PolyPilot/Services/CopilotService.Organization.cs @@ -38,6 +38,11 @@ public partial class CopilotService // causing worker results to be silently lost. private readonly ConcurrentDictionary _reflectLoopLocks = new(); + // Per-group semaphore to prevent concurrent orchestrator dispatches. + // The bridge's send_message handler and event queue drain can both call + // SendToMultiAgentGroupAsync for the same group; this ensures only one runs. + private readonly ConcurrentDictionary _groupDispatchLocks = new(); + // Queued user prompts received while a reflect loop is running. // Drained at the start of each loop iteration and sent to the orchestrator // so the model sees them in its conversation context. @@ -920,10 +925,18 @@ public async Task SendToMultiAgentGroupAsync(string groupId, string prompt, Canc var members = GetMultiAgentGroupMembers(groupId); if (members.Count == 0) { Debug($"[DISPATCH] SendToMultiAgentGroupAsync: no members for group '{group.Name}'"); return; } - Debug($"[DISPATCH] SendToMultiAgentGroupAsync: group='{group.Name}', mode={group.OrchestratorMode}, members={members.Count}"); + // Prevent concurrent dispatches to the same group (bridge + event queue drain race) + var dispatchLock = _groupDispatchLocks.GetOrAdd(groupId, _ => new SemaphoreSlim(1, 1)); + if (!await dispatchLock.WaitAsync(0, cancellationToken)) + { + Debug($"[DISPATCH] SendToMultiAgentGroupAsync: group '{group.Name}' dispatch already in progress, skipping"); + return; + } try { + Debug($"[DISPATCH] SendToMultiAgentGroupAsync: group='{group.Name}', mode={group.OrchestratorMode}, members={members.Count}"); + switch (group.OrchestratorMode) { case MultiAgentMode.Broadcast: @@ -948,6 +961,10 @@ public async Task SendToMultiAgentGroupAsync(string groupId, string prompt, Canc Debug($"[DISPATCH] SendToMultiAgentGroupAsync FAILED: {ex.GetType().Name}: {ex.Message}"); throw; } + finally + { + dispatchLock.Release(); + } } /// @@ -1068,8 +1085,8 @@ private async Task SendViaOrchestratorAsync(string groupId, List members nudgeSb.AppendLine($"You dispatched {dispatchedWorkers.Count} worker(s): {string.Join(", ", dispatchedWorkers)}"); nudgeSb.AppendLine($"{remaining.Count} worker(s) remain: {string.Join(", ", remaining)}"); nudgeSb.AppendLine(); - nudgeSb.AppendLine("You MUST dispatch at least one more worker. Each worker MUST receive a DIFFERENT sub-task from what you already assigned."); - nudgeSb.AppendLine("Use @worker:name...@end format."); + nudgeSb.AppendLine($"Dispatch ALL {remaining.Count} remaining workers NOW. Each MUST receive a DIFFERENT sub-task."); + nudgeSb.AppendLine("Produce one @worker:name...@end block for EACH remaining worker in this single response."); responseToParse = await SendPromptAndWaitAsync(orchestratorName, nudgeSb.ToString(), cancellationToken, originalPrompt: prompt); } @@ -1210,7 +1227,8 @@ private string BuildOrchestratorPlanningPrompt(string userPrompt, List w sb.AppendLine("Detailed task description for this worker."); sb.AppendLine("@end"); sb.AppendLine(); - sb.AppendLine("You may include brief analysis before the @worker blocks, but you MUST produce at least one @worker block."); + sb.AppendLine($"CRITICAL: Assign ALL workers that have relevant work IN THIS SINGLE RESPONSE. You have {workerNames.Count} workers — produce multiple @worker blocks now, not one at a time."); + sb.AppendLine("You may include brief analysis before the @worker blocks, but every response MUST contain @worker blocks for all workers you intend to use."); if (dispatcherOnly) sb.AppendLine("NEVER attempt to do the work yourself. ALWAYS delegate via @worker blocks."); return sb.ToString(); From 30132f97baa32a6d4dfc255f41ea5489830731ca Mon Sep 17 00:00:00 2001 From: Shane Date: Tue, 10 Mar 2026 10:59:30 -0500 Subject: [PATCH 5/6] Apply PR Review Squad findings: fix reconnect lock, watchdog tests, dedup helper - Fix reconnect lock double-check dead code: replace !IsConnectionError(ex) (always false) with !ReferenceEquals(_client, client) to detect if another worker already reconnected (CRITICAL) - Update ComputeEffectiveTimeout test helper to match 3-tier production logic (was using old 2-tier formula, causing false-negative tests) - Add WatchdogTimeout_UsedToolsIdle_Uses180s test for the middle tier - Update WatchdogTimeout_BetweenToolRounds to expect 180s (not 600s) - Update WatchdogTimeout_MultiAgent to expect 120s (isMultiAgent no longer escalates) - Update AllCombinations theory data for 3-tier formula - Extract DeduplicateAssignments() helper replacing 5 copy-pasted GroupBy chains - Add WorkerExecutionTimeout named constant (was magic TimeSpan.FromMinutes(10)) - Document _groupDispatchLocks silent-skip invariant - Narrow bare catch {} to catch (JsonException) in TryParseJsonAssignments - Add StringComparison.Ordinal to json.StartsWith Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- PolyPilot.Tests/ChatExperienceSafetyTests.cs | 42 ++++++++++----- .../Services/CopilotService.Organization.cs | 53 ++++++++++--------- PolyPilot/Services/CopilotService.cs | 3 +- 3 files changed, 59 insertions(+), 39 deletions(-) diff --git a/PolyPilot.Tests/ChatExperienceSafetyTests.cs b/PolyPilot.Tests/ChatExperienceSafetyTests.cs index 0f6889b8ca..1bb045ab2c 100644 --- a/PolyPilot.Tests/ChatExperienceSafetyTests.cs +++ b/PolyPilot.Tests/ChatExperienceSafetyTests.cs @@ -426,30 +426,33 @@ private static int ComputeEffectiveTimeout( bool isResumed, bool hasReceivedEvents) { var useResumeQuiescence = isResumed && !hasReceivedEvents && !hasActiveTool && !hasUsedTools; - var useToolTimeout = hasActiveTool || (isResumed && !useResumeQuiescence) || hasUsedTools || isMultiAgent; + var useToolTimeout = hasActiveTool || (isResumed && !useResumeQuiescence); + var useUsedToolsTimeout = !useToolTimeout && hasUsedTools && !hasActiveTool; return useResumeQuiescence ? CopilotService.WatchdogResumeQuiescenceTimeoutSeconds : useToolTimeout ? CopilotService.WatchdogToolExecutionTimeoutSeconds - : CopilotService.WatchdogInactivityTimeoutSeconds; + : useUsedToolsTimeout + ? CopilotService.WatchdogUsedToolsIdleTimeoutSeconds + : CopilotService.WatchdogInactivityTimeoutSeconds; } /// - /// INV-5: HasUsedToolsThisTurn BETWEEN tool rounds must keep 600s timeout. + /// INV-5: HasUsedToolsThisTurn BETWEEN tool rounds must keep 180s timeout (used-tools idle tier). /// This is the primary protection against "messages killed during long-running processes." /// ActiveToolCallCount resets on AssistantTurnStartEvent between rounds — only /// HasUsedToolsThisTurn persists and keeps the longer timeout. /// [Fact] - public void WatchdogTimeout_BetweenToolRounds_Uses600s() + public void WatchdogTimeout_BetweenToolRounds_Uses180s() { // Between tool rounds: ActiveToolCallCount=0, but HasUsedToolsThisTurn=true var timeout = ComputeEffectiveTimeout( hasActiveTool: false, hasUsedTools: true, isMultiAgent: false, isResumed: false, hasReceivedEvents: false); - Assert.Equal(CopilotService.WatchdogToolExecutionTimeoutSeconds, timeout); - Assert.Equal(600, timeout); + Assert.Equal(CopilotService.WatchdogUsedToolsIdleTimeoutSeconds, timeout); + Assert.Equal(180, timeout); } /// Active tool execution gets the 600s timeout. @@ -462,14 +465,14 @@ public void WatchdogTimeout_ActiveTool_Uses600s() Assert.Equal(600, timeout); } - /// Multi-agent sessions always get 600s to prevent killing workers mid-task. + /// Multi-agent sessions without active tools get 120s base timeout (isMultiAgent alone no longer escalates). [Fact] - public void WatchdogTimeout_MultiAgent_Uses600s() + public void WatchdogTimeout_MultiAgent_Uses120s() { var timeout = ComputeEffectiveTimeout( hasActiveTool: false, hasUsedTools: false, isMultiAgent: true, isResumed: false, hasReceivedEvents: false); - Assert.Equal(600, timeout); + Assert.Equal(120, timeout); } /// Resumed session with no events → 30s quiescence (fast recovery). @@ -483,6 +486,17 @@ public void WatchdogTimeout_ResumedNoEvents_Uses30sQuiescence() Assert.Equal(30, timeout); } + /// Used tools but none active → 180s middle tier (between 600s active and 120s base). + [Fact] + public void WatchdogTimeout_UsedToolsIdle_Uses180s() + { + var timeout = ComputeEffectiveTimeout( + hasActiveTool: false, hasUsedTools: true, isMultiAgent: false, + isResumed: false, hasReceivedEvents: false); + Assert.Equal(CopilotService.WatchdogUsedToolsIdleTimeoutSeconds, timeout); + Assert.Equal(180, timeout); + } + /// Resumed session with events flowing → 600s (session is active). [Fact] public void WatchdogTimeout_ResumedWithEvents_Uses600s() @@ -511,12 +525,12 @@ public void WatchdogTimeout_BaseCase_Uses120s() [Theory] [InlineData(false, false, false, false, false, 120)] // base case [InlineData(true, false, false, false, false, 600)] // active tool - [InlineData(false, true, false, false, false, 600)] // used tools (between rounds!) - [InlineData(false, false, true, false, false, 600)] // multi-agent - [InlineData(true, true, false, false, false, 600)] // active + used + [InlineData(false, true, false, false, false, 180)] // used tools (between rounds) → 180s middle tier + [InlineData(false, false, true, false, false, 120)] // multi-agent alone → base (no escalation) + [InlineData(true, true, false, false, false, 600)] // active + used → active wins (600s) [InlineData(true, false, true, false, false, 600)] // active + multi - [InlineData(false, true, true, false, false, 600)] // used + multi - [InlineData(true, true, true, false, false, 600)] // all three + [InlineData(false, true, true, false, false, 180)] // used + multi → used-tools tier (180s) + [InlineData(true, true, true, false, false, 600)] // all three → active wins (600s) public void WatchdogTimeout_AllCombinations( bool hasActive, bool hasUsed, bool isMulti, bool isResumed, bool hasEvents, int expected) diff --git a/PolyPilot/Services/CopilotService.Organization.cs b/PolyPilot/Services/CopilotService.Organization.cs index 2fb4dffac9..93f4a08d55 100644 --- a/PolyPilot/Services/CopilotService.Organization.cs +++ b/PolyPilot/Services/CopilotService.Organization.cs @@ -29,6 +29,9 @@ public partial class CopilotService { public event Action? OnOrchestratorPhaseChanged; // groupId, phase, detail + /// Maximum time a single worker is allowed to run before being cancelled. + private static readonly TimeSpan WorkerExecutionTimeout = TimeSpan.FromMinutes(10); + // Per-session semaphores to prevent concurrent model switches during rapid dispatch private readonly ConcurrentDictionary _modelSwitchLocks = new(); @@ -41,6 +44,11 @@ public partial class CopilotService // Per-group semaphore to prevent concurrent orchestrator dispatches. // The bridge's send_message handler and event queue drain can both call // SendToMultiAgentGroupAsync for the same group; this ensures only one runs. + // Per-group semaphore to prevent concurrent orchestrator dispatches. + // The bridge's send_message handler and event queue drain can both call + // SendToMultiAgentGroupAsync for the same group; this ensures only one runs. + // INVARIANT: A second concurrent dispatch is silently skipped (WaitAsync(0) returns false) + // and logged at Debug level. This prevents "Session already processing" errors. private readonly ConcurrentDictionary _groupDispatchLocks = new(); // Queued user prompts received while a reflect loop is running. @@ -1094,11 +1102,7 @@ private async Task SendViaOrchestratorAsync(string groupId, List members Debug($"[DISPATCH] '{orchestratorName}' iteration {dispatchIter}: {rawAssignments.Count} raw assignments. Response length={responseToParse.Length}"); // Deduplicate: merge multiple tasks for the same worker - var iterAssignments = rawAssignments - .GroupBy(a => a.WorkerName, StringComparer.OrdinalIgnoreCase) - .Select(g => new TaskAssignment(g.Key, string.Join("\n\n---\n\n", g.Select(a => a.Task)))) - .Where(a => !dispatchedWorkers.Contains(a.WorkerName)) // skip already-dispatched - .ToList(); + var iterAssignments = DeduplicateAssignments(rawAssignments, dispatchedWorkers); if (iterAssignments.Count == 0 && dispatchIter == 0) { @@ -1106,10 +1110,7 @@ private async Task SendViaOrchestratorAsync(string groupId, List members Debug($"[DISPATCH] No assignments parsed. Sending delegation nudge."); var nudgePrompt = BuildDelegationNudgePrompt(workerNames); var nudgeResponse = await SendPromptAndWaitAsync(orchestratorName, nudgePrompt, cancellationToken, originalPrompt: prompt); - iterAssignments = ParseTaskAssignments(nudgeResponse, workerNames) - .GroupBy(a => a.WorkerName, StringComparer.OrdinalIgnoreCase) - .Select(g => new TaskAssignment(g.Key, string.Join("\n\n---\n\n", g.Select(a => a.Task)))) - .ToList(); + iterAssignments = DeduplicateAssignments(ParseTaskAssignments(nudgeResponse, workerNames), dispatchedWorkers); Debug($"[DISPATCH] Nudge parsed: {iterAssignments.Count} assignments."); } @@ -1236,6 +1237,19 @@ private string BuildOrchestratorPlanningPrompt(string userPrompt, List w internal record TaskAssignment(string WorkerName, string Task); + /// Deduplicate raw assignments by merging tasks for the same worker. + private static List DeduplicateAssignments( + List raw, + HashSet? excludeWorkers = null) + { + var query = raw + .GroupBy(a => a.WorkerName, StringComparer.OrdinalIgnoreCase) + .Select(g => new TaskAssignment(g.Key, string.Join("\n\n---\n\n", g.Select(a => a.Task)))); + if (excludeWorkers != null) + query = query.Where(a => !excludeWorkers.Contains(a.WorkerName)); + return query.ToList(); + } + /// /// Builds a delegation nudge prompt with explicit format example. /// The multiline format is required because ParseTaskAssignments' regex @@ -1300,7 +1314,7 @@ internal static List TryParseJsonAssignments(string response, Li json = fenceMatch.Groups[1].Value.Trim(); // Must start with [ to be a JSON array - if (!json.StartsWith("[")) return assignments; + if (!json.StartsWith("[", StringComparison.Ordinal)) return assignments; using var doc = System.Text.Json.JsonDocument.Parse(json); foreach (var element in doc.RootElement.EnumerateArray()) @@ -1315,7 +1329,7 @@ internal static List TryParseJsonAssignments(string response, Li assignments.Add(new TaskAssignment(resolved, task)); } } - catch { /* Not valid JSON — fall through to regex */ } + catch (System.Text.Json.JsonException) { /* Not valid JSON — fall through to regex */ } return assignments; } @@ -1396,7 +1410,7 @@ private async Task SendPromptAndWaitAsync(string sessionName, string pro // Do NOT capture state and await its TCS separately: reconnection replaces the state // object, orphaning the old TCS and causing a 10-minute hang. using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - cts.CancelAfter(TimeSpan.FromMinutes(10)); + cts.CancelAfter(WorkerExecutionTimeout); // Wire CTS to ResponseCompletion TCS so the 10-minute timeout actually cancels the await. // Must look up from _sessions dict (not captured ref) since reconnect replaces state. await using var ctsReg = cts.Token.Register(() => @@ -2207,10 +2221,7 @@ private async Task SendViaOrchestratorReflectAsync(string groupId, List var planResponse = await SendPromptAndWaitAsync(orchestratorName, planPrompt, ct, originalPrompt: prompt); var rawAssignments = ParseTaskAssignments(planResponse, workerNames); Debug($"[DISPATCH] '{orchestratorName}' reflect plan parsed: {rawAssignments.Count} raw assignments from {workerNames.Count} workers. Iteration={reflectState.CurrentIteration}, Response length={planResponse.Length}"); - var assignments = rawAssignments - .GroupBy(a => a.WorkerName, StringComparer.OrdinalIgnoreCase) - .Select(g => new TaskAssignment(g.Key, string.Join("\n\n---\n\n", g.Select(a => a.Task)))) - .ToList(); + var assignments = DeduplicateAssignments(rawAssignments); if (assignments.Count == 0) { @@ -2227,10 +2238,7 @@ private async Task SendViaOrchestratorReflectAsync(string groupId, List Debug($"[DISPATCH] '{orchestratorName}' nudge parsed: {nudgeAssignments.Count} raw assignments. Response length={nudgeResponse.Length}"); if (nudgeAssignments.Count > 0) { - assignments = nudgeAssignments - .GroupBy(a => a.WorkerName, StringComparer.OrdinalIgnoreCase) - .Select(g => new TaskAssignment(g.Key, string.Join("\n\n---\n\n", g.Select(a => a.Task)))) - .ToList(); + assignments = DeduplicateAssignments(nudgeAssignments); // Fall through to dispatch below } else @@ -2262,10 +2270,7 @@ private async Task SendViaOrchestratorReflectAsync(string groupId, List // Merge any @worker assignments from queued prompt responses if (queuedAssignments.Count > 0) { - var extra = queuedAssignments - .GroupBy(a => a.WorkerName, StringComparer.OrdinalIgnoreCase) - .Select(g => new TaskAssignment(g.Key, string.Join("\n\n---\n\n", g.Select(a => a.Task)))) - .ToList(); + var extra = DeduplicateAssignments(queuedAssignments); foreach (var a in extra) { var existing = assignments.FirstOrDefault(x => string.Equals(x.WorkerName, a.WorkerName, StringComparison.OrdinalIgnoreCase)); diff --git a/PolyPilot/Services/CopilotService.cs b/PolyPilot/Services/CopilotService.cs index c90c58584f..a3ca55bfd8 100644 --- a/PolyPilot/Services/CopilotService.cs +++ b/PolyPilot/Services/CopilotService.cs @@ -2506,7 +2506,8 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis try { // Double-check: another worker may have already reconnected while we waited. - if (_client != null && !IsConnectionError(ex)) + // Compare references — if _client changed, someone else already recreated it. + if (!ReferenceEquals(_client, client)) { Debug($"Client already reconnected by another worker, skipping recreate for '{sessionName}'"); client = _client; From 1d34a9a45b796b010bcdf82c8254da3621a23b5d Mon Sep 17 00:00:00 2001 From: Shane Date: Tue, 10 Mar 2026 11:50:00 -0500 Subject: [PATCH 6/6] fix: queue messages instead of dropping them during active dispatch The _groupDispatchLocks guard used WaitAsync(0) which silently dropped any message arriving while a dispatch was in progress. This caused user messages sent to a busy orchestrator to vanish entirely. Changed to WaitAsync(ct) so concurrent callers wait their turn and execute sequentially instead of being discarded. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Services/CopilotService.Organization.cs | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/PolyPilot/Services/CopilotService.Organization.cs b/PolyPilot/Services/CopilotService.Organization.cs index 93f4a08d55..80daafb69e 100644 --- a/PolyPilot/Services/CopilotService.Organization.cs +++ b/PolyPilot/Services/CopilotService.Organization.cs @@ -41,14 +41,12 @@ public partial class CopilotService // causing worker results to be silently lost. private readonly ConcurrentDictionary _reflectLoopLocks = new(); - // Per-group semaphore to prevent concurrent orchestrator dispatches. + // Per-group semaphore to serialize orchestrator dispatches. // The bridge's send_message handler and event queue drain can both call - // SendToMultiAgentGroupAsync for the same group; this ensures only one runs. - // Per-group semaphore to prevent concurrent orchestrator dispatches. - // The bridge's send_message handler and event queue drain can both call - // SendToMultiAgentGroupAsync for the same group; this ensures only one runs. - // INVARIANT: A second concurrent dispatch is silently skipped (WaitAsync(0) returns false) - // and logged at Debug level. This prevents "Session already processing" errors. + // SendToMultiAgentGroupAsync for the same group; this ensures they run sequentially. + // Concurrent callers wait in line rather than running simultaneously, which prevents + // "Session already processing" errors from overlapping SendPromptAndWaitAsync calls. + // New user messages sent while a dispatch is in progress execute after the current one completes. private readonly ConcurrentDictionary _groupDispatchLocks = new(); // Queued user prompts received while a reflect loop is running. @@ -933,13 +931,10 @@ public async Task SendToMultiAgentGroupAsync(string groupId, string prompt, Canc var members = GetMultiAgentGroupMembers(groupId); if (members.Count == 0) { Debug($"[DISPATCH] SendToMultiAgentGroupAsync: no members for group '{group.Name}'"); return; } - // Prevent concurrent dispatches to the same group (bridge + event queue drain race) + // Serialize dispatches to the same group (bridge + event queue drain race). + // Callers wait their turn rather than being dropped. var dispatchLock = _groupDispatchLocks.GetOrAdd(groupId, _ => new SemaphoreSlim(1, 1)); - if (!await dispatchLock.WaitAsync(0, cancellationToken)) - { - Debug($"[DISPATCH] SendToMultiAgentGroupAsync: group '{group.Name}' dispatch already in progress, skipping"); - return; - } + await dispatchLock.WaitAsync(cancellationToken); try {