From 2daaf73351c9e9ed5bbf25ff3ac2395ea6e35bc4 Mon Sep 17 00:00:00 2001 From: Shane Neuville Date: Fri, 6 Mar 2026 19:43:35 -0700 Subject: [PATCH 1/5] Add built-in Skill Validator multi-agent preset MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a new built-in GroupPreset called "Skill Validator" (⚖️) that pits two evaluators against each other to assess skills: - Worker 1 (Dotnet Skill Validator): empirical, outcome-focused evaluation using methodology inspired by dotnet/skills skill-validator — measures baseline vs. skill-augmented performance, pairwise comparative judging, statistical confidence assessment. - Worker 2 (Anthropic Skill Evaluator): prompt-design-focused evaluation assessing description quality/trigger accuracy, instruction clarity, scope appropriateness, and test coverage. - Orchestrator: routes work to both evaluators, highlights agreements and disagreements, explains which suggestions are adopted and why, produces a consensus KEEP/IMPROVE/REMOVE verdict. Configuration: - Mode: OrchestratorReflect - OrchestratorModel: claude-opus-4.6 - WorkerModels: 2x claude-sonnet-4.6 - MaxReflectIterations: 6 Also updates Scenario_CreateGroupFromPreset test to expect 3 built-in presets and adds BuiltInPresets_IncludeSkillValidator test. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- PolyPilot.Tests/SessionOrganizationTests.cs | 20 +- PolyPilot/Models/ModelCapabilities.cs | 191 ++++++++++++++++++++ 2 files changed, 209 insertions(+), 2 deletions(-) diff --git a/PolyPilot.Tests/SessionOrganizationTests.cs b/PolyPilot.Tests/SessionOrganizationTests.cs index 2e1e5df14d..e123df7a22 100644 --- a/PolyPilot.Tests/SessionOrganizationTests.cs +++ b/PolyPilot.Tests/SessionOrganizationTests.cs @@ -1364,6 +1364,22 @@ public void BuiltInPresets_IncludePRReviewSquad() Assert.NotNull(prSquad.WorkerSystemPrompts); Assert.Equal(prSquad.WorkerModels.Length, prSquad.WorkerSystemPrompts!.Length); } + + [Fact] + public void BuiltInPresets_IncludeSkillValidator() + { + var skillValidator = GroupPreset.BuiltIn.FirstOrDefault(p => p.Name == "Skill Validator"); + Assert.NotNull(skillValidator); + Assert.Equal(2, skillValidator!.WorkerModels.Length); + Assert.Equal(MultiAgentMode.OrchestratorReflect, skillValidator.Mode); + Assert.Equal("⚖️", skillValidator.Emoji); + Assert.NotNull(skillValidator.SharedContext); + Assert.NotNull(skillValidator.RoutingContext); + Assert.NotNull(skillValidator.WorkerSystemPrompts); + Assert.Equal(2, skillValidator.WorkerSystemPrompts!.Length); + Assert.All(skillValidator.WorkerSystemPrompts, p => Assert.False(string.IsNullOrWhiteSpace(p))); + Assert.NotNull(skillValidator.MaxReflectIterations); + } } public class GroupModelAnalyzerTests @@ -1766,7 +1782,7 @@ public class MultiAgentScenarioTests /// /// User flow: /// 1. Click 🚀 Preset in sidebar toolbar - /// 2. Preset picker appears showing 2 built-in templates + /// 2. Preset picker appears showing 3 built-in templates /// 3. Select "PR Review Squad" (📋) /// 4. System creates: Orchestrator (claude-opus-4.6) + 5 Workers /// 5. Sidebar shows group with mode selector set to "🎯 Orchestrator" @@ -1777,7 +1793,7 @@ public void Scenario_CreateGroupFromPreset() { // Step 1-2: User sees built-in presets var presets = GroupPreset.BuiltIn; - Assert.Equal(2, presets.Length); + Assert.Equal(3, presets.Length); // Step 3: User picks "PR Review Squad" var prReview = presets.First(p => p.Name == "PR Review Squad"); diff --git a/PolyPilot/Models/ModelCapabilities.cs b/PolyPilot/Models/ModelCapabilities.cs index 13634ab818..c8773e8695 100644 --- a/PolyPilot/Models/ModelCapabilities.cs +++ b/PolyPilot/Models/ModelCapabilities.cs @@ -344,6 +344,197 @@ public record GroupPreset(string Name, string Description, string Emoji, MultiAg """, MaxReflectIterations = 10, }, + + new GroupPreset( + "Skill Validator", "Two evaluators assess your skill from different angles — dotnet empirical testing vs. Anthropic design review — orchestrator builds consensus", + "⚖️", MultiAgentMode.OrchestratorReflect, + "claude-opus-4.6", new[] { "claude-sonnet-4.6", "claude-sonnet-4.6" }) + { + WorkerSystemPrompts = new[] + { + """ + You are the Dotnet Skill Validator. You evaluate skills empirically using the methodology from the dotnet/skills skill-validator tool. + + ## Your Evaluation Approach + + For each skill under evaluation, perform these steps: + + ### 1. Inspect the Skill Definition + - Read the SKILL.md file carefully + - Read the skill's `tests/eval.yaml` if present + - Identify what scenarios the skill claims to improve + + ### 2. Empirical Baseline Comparison + - For each scenario in the skill's eval.yaml (or infer representative scenarios from SKILL.md): + a. Describe what the agent response would look like WITHOUT the skill (baseline) + b. Describe what the agent response looks like WITH the skill + c. Assess: token efficiency, tool call count, task completion, error rate + + ### 3. Pairwise Comparative Judgment + - Compare baseline vs. skill-augmented performance side-by-side + - Apply position-swap bias mitigation: consider both orderings in your judgment + - Rate improvement on a scale of -1 (degraded) to +1 (strong improvement) with confidence + + ### 4. Statistical Assessment + - Estimate variance across the scenario set + - Flag scenarios where the skill helps most vs. least + - Identify scenarios where the skill might cause regressions + + ### 5. Verdict + Format your verdict as: + ``` + ## Dotnet Validator Verdict + **Overall Score**: X/10 + **Confidence**: High / Medium / Low + **Verdict**: KEEP / IMPROVE / REMOVE + + ### Strengths + - [specific strengths with evidence] + + ### Weaknesses + - [specific weaknesses with evidence] + + ### Suggested Improvements + - [concrete, actionable suggestions] + ``` + + ## Rules + - Be data-driven: cite specific scenarios and observed behaviors + - Focus on measurable outcomes: does it reduce tokens? improve task completion? reduce errors? + - Don't recommend keeping a skill that shows no measurable improvement in your empirical analysis + """, + + """ + You are the Anthropic Skill Evaluator. You evaluate skills through the lens of prompt engineering quality, trigger accuracy, and agent guidance design. + + ## Your Evaluation Approach + + For each skill under evaluation, assess these dimensions: + + ### 1. Description Quality (Trigger Accuracy) + - Is the description specific enough to trigger reliably for its intended use cases? + - Is it too broad — will it trigger for unintended tasks? + - Does it include enough trigger phrases/keywords to match user intent? + - Rate trigger precision (0-10) and recall (0-10) + + ### 2. Instruction Clarity + - Are the instructions in SKILL.md clear, actionable, and unambiguous? + - Do they tell the agent *exactly* what to do and in what order? + - Are there missing edge cases or situations the skill doesn't handle? + - Does the skill avoid over-constraining the agent in ways that limit helpfulness? + + ### 3. Scope Appropriateness + - Is the skill focused on a single, well-defined capability? + - Is the skill too broad (trying to do too much) or too narrow (not useful enough)? + - Does the skill overlap significantly with other skills? (potential conflicts) + + ### 4. Test Coverage + - Does the eval.yaml cover the happy path? + - Does it cover edge cases and failure modes? + - Are negative test cases present (things the skill should NOT do)? + + ### 5. Verdict + Format your verdict as: + ``` + ## Anthropic Evaluator Verdict + **Overall Score**: X/10 + **Trigger Precision**: X/10 | **Trigger Recall**: X/10 + **Verdict**: KEEP / IMPROVE / REMOVE + + ### Strengths + - [specific strengths] + + ### Weaknesses + - [specific weaknesses] + + ### Suggested Improvements + - [concrete rewrites or additions with examples] + ``` + + ## Rules + - Be concrete: quote specific lines from SKILL.md when critiquing + - Focus on prompt quality, not empirical test results + - Suggest specific rewrites, not vague advice like "be clearer" + """, + }, + SharedContext = """ + ## Skill Evaluation Standards + + Both evaluators assess the same skill and produce independent verdicts. + A good skill must satisfy BOTH evaluators to be marked KEEP. + + ### What makes a skill worth keeping + - Measurable improvement in task completion (Dotnet validator perspective) + - Clear, precise description that triggers reliably (Anthropic evaluator perspective) + - Focused scope — does one thing well + - Actionable instructions that guide the agent without over-constraining it + - Adequate test coverage + + ### What warrants IMPROVE + - Good intent but fixable gaps (bad trigger description, missing scenarios, ambiguous instructions) + - One evaluator says KEEP but the other says REMOVE with specific concerns + + ### What warrants REMOVE + - No measurable improvement in empirical testing + - Trigger description too broad/narrow to be useful + - Instructions that would cause regressions or confusion + + ### Consensus Rule + - KEEP requires both evaluators to say KEEP or one KEEP + one IMPROVE + - IMPROVE if the evaluators disagree or both say IMPROVE + - REMOVE if either evaluator says REMOVE with strong evidence + """, + RoutingContext = """ + ## Skill Validator Orchestration + + You orchestrate two skill evaluators who assess skills from different angles. Your role is to: + 1. Dispatch the skill to BOTH evaluators simultaneously (or sequentially if needed) + 2. Collect their verdicts + 3. Identify where they agree and disagree + 4. Build a consensus verdict explaining which suggestions to adopt and why + + ### Worker Names + - **worker-1** = Dotnet Skill Validator (empirical, outcome-focused) + - **worker-2** = Anthropic Skill Evaluator (prompt-design-focused) + + ### Dispatch Pattern + 1. **First dispatch**: Send the skill content to BOTH workers. Include the full SKILL.md content and eval.yaml if available. + 2. **After both complete**: Compare their verdicts. Identify agreements (high confidence) and disagreements (requires judgment). + 3. **Build consensus**: Produce a final report that explains which worker's suggestions are adopted, which are declined, and why. + + ### Consensus Report Format + ``` + ## Skill Validator Consensus Report: [Skill Name] + + ### Summary + **Dotnet Verdict**: KEEP/IMPROVE/REMOVE (X/10) + **Anthropic Verdict**: KEEP/IMPROVE/REMOVE (X/10) + **Consensus**: KEEP / IMPROVE / REMOVE + + ### Points of Agreement (High Confidence) + - [issues both evaluators flagged] + + ### Points of Disagreement (Requires Judgment) + - [Dotnet says X, Anthropic says Y — adopted: Z because ...] + + ### Adopted Suggestions + - [suggestions we are recommending, with rationale] + + ### Declined Suggestions + - [suggestions we are NOT adopting, with rationale] + + ### Final Recommendation + [1-2 sentence actionable summary] + ``` + + ### Rules + - Always explain WHY suggestions are adopted or declined + - Where evaluators disagree, explain the tradeoff and make a reasoned judgment + - Highlight suggestions both evaluators agree on as high-confidence improvements + - NEVER emit [[GROUP_REFLECT_COMPLETE]] until both evaluators have responded and a consensus report is produced + """, + MaxReflectIterations = 6, + }, }; } From 0359d3bc7035835ca3f208fb099c0cc811c9a109 Mon Sep 17 00:00:00 2001 From: Shane Date: Tue, 10 Mar 2026 17:57:25 -0500 Subject: [PATCH 2/5] Fix stuck sessions with escalation timeout, tool health check, and re-dispatch clearing Three fixes to dramatically reduce recovery time for stuck sessions: 1. Escalation timeout: After the first 600s timeout with no events (but server alive), switch to 60s timeout for subsequent checks. Reduces max stuck time from ~40 minutes to ~12 minutes. 2. Tool health check: Start a 30s timer when a tool begins executing. If no events arrive within 30s, check if the connection is still alive. After 3 consecutive stale checks (90s total), trigger recovery. This detects dead connections much faster than waiting for the 10-minute watchdog. 3. Re-dispatch clearing: Before re-dispatching workers after app restart, clear any stuck IsProcessing/ActiveToolCallCount/SendingFlag state from the previous incomplete turn. This allows SendPromptAsync to accept the new prompt instead of rejecting with 'already processing'. Also resets HasUsedToolsThisTurn on reconnect so the watchdog uses the appropriate timeout (120s instead of 600s) for the fresh connection. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- PolyPilot/Services/CopilotService.Events.cs | 154 +++++++++++++++++- .../Services/CopilotService.Organization.cs | 17 ++ PolyPilot/Services/CopilotService.cs | 13 ++ 3 files changed, 176 insertions(+), 8 deletions(-) diff --git a/PolyPilot/Services/CopilotService.Events.cs b/PolyPilot/Services/CopilotService.Events.cs index 6d3ee23701..98e3250d59 100644 --- a/PolyPilot/Services/CopilotService.Events.cs +++ b/PolyPilot/Services/CopilotService.Events.cs @@ -308,6 +308,9 @@ void Invoke(Action action) if (toolStart.Data == null) break; Interlocked.Increment(ref state.ActiveToolCallCount); state.HasUsedToolsThisTurn = true; // volatile field — no explicit barrier needed + // Record tool start time and schedule health check + Interlocked.Exchange(ref state.ToolStartedAtTicks, DateTime.UtcNow.Ticks); + ScheduleToolHealthCheck(state, sessionName); if (state.Info.ProcessingPhase < 3) { state.Info.ProcessingPhase = 3; // Working @@ -362,6 +365,8 @@ void Invoke(Action action) case ToolExecutionCompleteEvent toolDone: if (toolDone.Data == null) break; Interlocked.Decrement(ref state.ActiveToolCallCount); + // Cancel the tool health check timer since tool completed normally + CancelToolHealthCheck(state); Interlocked.Increment(ref state.Info._toolCallCount); var completeCallId = toolDone.Data.ToolCallId ?? ""; var completeToolName = toolDone.Data?.GetType().GetProperty("ToolName")?.GetValue(toolDone.Data)?.ToString(); @@ -1390,6 +1395,11 @@ private void HandleReflectionAdvanceResult(SessionState state, string response, /// If no SDK events arrive for this many seconds while a tool is actively executing, the session is considered stuck. /// This is much longer because legitimate tool executions (e.g., running UI tests, long builds) can take many minutes. internal const int WatchdogToolExecutionTimeoutSeconds = 600; + /// After the first Case A reset (tool running + server alive but no events), switch to this + /// shorter timeout for subsequent checks. This accelerates dead connection detection while still + /// allowing the first 600s for legitimate long-running tools. Total max stuck time becomes + /// 600s + (WatchdogMaxToolAliveResets × 60s) ≈ 12 minutes instead of 40 minutes. + internal const int WatchdogToolEscalationTimeoutSeconds = 60; // Sessions that USED tools but have none actively running — the model may be // thinking between tool rounds, but 600s is too long for a likely-dead session. internal const int WatchdogUsedToolsIdleTimeoutSeconds = 180; @@ -1408,8 +1418,16 @@ private void HandleReflectionAdvanceResult(SessionState state, string response, /// session's transport-level connection is broken (ConnectionLostException). Without this cap, /// Case A resets LastEventAtTicks indefinitely, and ProcessingStartedAt resets on each app /// restart — so neither the inactivity nor the max-time safety net ever fires. - /// (3+1) resets × 600s effective timeout ≈ 40 minutes max of Case A resets. - internal const int WatchdogMaxToolAliveResets = 3; + /// With escalation timeout (60s after first reset), total max stuck time is: + /// 600s (first) + 2 × 60s (escalation) = 720s ≈ 12 minutes. + internal const int WatchdogMaxToolAliveResets = 2; + + /// + /// Milliseconds after a tool starts to perform the first health check. If no events have + /// arrived since tool start, we verify the connection is still alive. This detects dead + /// connections within ~30s instead of waiting for the 600s watchdog timeout. + /// + internal const int ToolHealthCheckIntervalMs = 30_000; /// /// Milliseconds to wait after AssistantTurnEndEvent before firing CompleteResponse @@ -1436,6 +1454,115 @@ private static void CancelProcessingWatchdog(SessionState state) } } + /// + /// Cancels and disposes any pending tool health check timer. + /// + private static void CancelToolHealthCheck(SessionState state) + { + var prev = Interlocked.Exchange(ref state.ToolHealthCheckTimer, null); + prev?.Dispose(); + } + + /// + /// Schedules a tool health check to run after ToolHealthCheckIntervalMs. + /// If the tool completes before the timer fires, the timer is cancelled. + /// If the timer fires and no events have arrived since tool start, we check + /// if the connection is still alive and trigger recovery if it's dead. + /// + private void ScheduleToolHealthCheck(SessionState state, string sessionName) + { + // Cancel any previous health check timer + CancelToolHealthCheck(state); + + // Skip in demo/remote mode where we can't probe the local server + if (IsDemoMode || IsRemoteMode) return; + + var checkGeneration = Interlocked.Read(ref state.ProcessingGeneration); + var timer = new Timer(_ => + { + try + { + // Verify we're still on the same turn + if (Interlocked.Read(ref state.ProcessingGeneration) != checkGeneration) return; + if (!state.Info.IsProcessing) return; + + var activeTools = Volatile.Read(ref state.ActiveToolCallCount); + if (activeTools <= 0) return; // Tool completed normally + + // Check if any events arrived since tool start + var lastEventTicks = Interlocked.Read(ref state.LastEventAtTicks); + var toolStartTicks = Interlocked.Read(ref state.ToolStartedAtTicks); + var eventsSinceToolStart = lastEventTicks > toolStartTicks; + + if (eventsSinceToolStart) + { + // Events are still flowing - tool is legitimately running + // Schedule another check + Debug($"[TOOL-HEALTH] '{sessionName}' events flowing — rescheduling health check"); + ScheduleToolHealthCheck(state, sessionName); + return; + } + + // No events since tool start. Check if server is alive. + var serverAlive = _serverManager.IsServerRunning; + if (!serverAlive) + { + Debug($"[TOOL-HEALTH] '{sessionName}' server is DEAD — triggering immediate recovery"); + TriggerToolHealthRecovery(state, sessionName, "server not responding"); + return; + } + + // Server TCP port is alive, but no events for 30s. The connection might be dead. + // Increment the stale check counter and check if we should recover. + var staleChecks = Interlocked.Increment(ref state.WatchdogCaseAResets); + if (staleChecks > WatchdogMaxToolAliveResets) + { + Debug($"[TOOL-HEALTH] '{sessionName}' {staleChecks} stale health checks — assuming dead connection"); + TriggerToolHealthRecovery(state, sessionName, "no events after multiple health checks (connection likely dead)"); + return; + } + + Debug($"[TOOL-HEALTH] '{sessionName}' no events for {ToolHealthCheckIntervalMs/1000}s, server alive — " + + $"check {staleChecks}/{WatchdogMaxToolAliveResets}, scheduling another check"); + ScheduleToolHealthCheck(state, sessionName); + } + catch (Exception ex) + { + Debug($"[TOOL-HEALTH] '{sessionName}' check failed: {ex.Message}"); + } + }, null, ToolHealthCheckIntervalMs, Timeout.Infinite); + + Interlocked.Exchange(ref state.ToolHealthCheckTimer, timer); + } + + /// + /// Triggers recovery when tool health check detects a dead connection. + /// Clears the stuck processing state and notifies the user. + /// + private void TriggerToolHealthRecovery(SessionState state, string sessionName, string reason) + { + CancelToolHealthCheck(state); + + var activeTools = Volatile.Read(ref state.ActiveToolCallCount); + Debug($"[TOOL-HEALTH] '{sessionName}' triggering recovery: {reason} (activeTools={activeTools})"); + + InvokeOnUI(() => + { + if (!state.Info.IsProcessing) return; + + OnError?.Invoke(sessionName, $"Tool execution stuck ({reason}). Session recovered automatically."); + + // Clear the stuck state + state.Info.IsProcessing = false; + Interlocked.Exchange(ref state.ActiveToolCallCount, 0); + Interlocked.Exchange(ref state.SendingFlag, 0); + state.HasUsedToolsThisTurn = false; + state.ResponseCompletion?.TrySetResult(state.CurrentResponse.ToString()); + FlushCurrentResponse(state); + OnStateChanged?.Invoke(); + }); + } + /// /// Cancels and disposes any pending TurnEnd→Idle fallback timer on the state. /// Mirrors the CancelProcessingWatchdog pattern: Cancel + Dispose to avoid @@ -1551,13 +1678,22 @@ private async Task RunProcessingWatchdogAsync(SessionState state, string session var useToolTimeout = hasActiveTool || (state.Info.IsResumed && !useResumeQuiescence); var useUsedToolsTimeout = !useToolTimeout && hasUsedTools && !hasActiveTool; + + // After the first Case A reset (tool running + server alive but no events arrived), + // switch to the escalation timeout. This allows the first 600s for legitimate long-running + // tools, but speeds up dead connection detection on subsequent checks. + var caseAResets = Volatile.Read(ref state.WatchdogCaseAResets); + var useEscalationTimeout = useToolTimeout && caseAResets > 0; + var effectiveTimeout = useResumeQuiescence ? WatchdogResumeQuiescenceTimeoutSeconds - : useToolTimeout - ? WatchdogToolExecutionTimeoutSeconds - : useUsedToolsTimeout - ? WatchdogUsedToolsIdleTimeoutSeconds - : WatchdogInactivityTimeoutSeconds; + : useEscalationTimeout + ? WatchdogToolEscalationTimeoutSeconds + : useToolTimeout + ? WatchdogToolExecutionTimeoutSeconds + : useUsedToolsTimeout + ? WatchdogUsedToolsIdleTimeoutSeconds + : WatchdogInactivityTimeoutSeconds; // Safety net: check absolute max processing time, but only if events have also // gone stale. If events are still flowing (elapsed < effectiveTimeout), the session @@ -1614,8 +1750,10 @@ private async Task RunProcessingWatchdogAsync(SessionState state, string session } else { + // Next check will use escalation timeout (60s) since we've had at least one reset + var nextTimeout = WatchdogToolEscalationTimeoutSeconds; Debug($"[WATCHDOG] '{sessionName}' {elapsed:F0}s inactivity but tool is running and server is alive — resetting timer " + - $"(reset #{resets}/{WatchdogMaxToolAliveResets}, timeout={effectiveTimeout}s, totalProcessing={totalProcessingSeconds:F0}s)"); + $"(reset #{resets}/{WatchdogMaxToolAliveResets}, nextTimeout={nextTimeout}s, totalProcessing={totalProcessingSeconds:F0}s)"); Interlocked.Exchange(ref state.LastEventAtTicks, DateTime.UtcNow.Ticks); continue; // keep waiting — don't kill } diff --git a/PolyPilot/Services/CopilotService.Organization.cs b/PolyPilot/Services/CopilotService.Organization.cs index 80daafb69e..8b51abaaaf 100644 --- a/PolyPilot/Services/CopilotService.Organization.cs +++ b/PolyPilot/Services/CopilotService.Organization.cs @@ -1641,6 +1641,23 @@ private async Task MonitorAndSynthesizeAsync(PendingOrchestration pending, Cance InvokeOnUI(() => OnOrchestratorPhaseChanged?.Invoke(pending.GroupId, OrchestratorPhase.Dispatching, $"Re-dispatching {unstartedWorkers.Count} worker(s)")); + // BUG FIX: Clear stuck IsProcessing state on workers before re-dispatch. + // After an app restart, workers may have IsProcessing=true from a previous + // incomplete turn (e.g., tool.execution_start with no tool.execution_complete). + // This prevents SendPromptAsync from accepting new prompts. Clear it here + // so re-dispatch can actually send the prompt. + foreach (var workerName in unstartedWorkers) + { + if (_sessions.TryGetValue(workerName, out var workerState) && workerState.Info.IsProcessing) + { + Debug($"[DISPATCH] Clearing stuck IsProcessing on '{workerName}' before re-dispatch"); + workerState.Info.IsProcessing = false; + Interlocked.Exchange(ref workerState.ActiveToolCallCount, 0); + Interlocked.Exchange(ref workerState.SendingFlag, 0); + workerState.HasUsedToolsThisTurn = false; + } + } + // Build a generic task from the original prompt for re-dispatched workers. // Materialize as an array so we can inspect individual task results even on partial failure. var redispatchTaskArray = unstartedWorkers diff --git a/PolyPilot/Services/CopilotService.cs b/PolyPilot/Services/CopilotService.cs index dde17728ca..f6d05872f5 100644 --- a/PolyPilot/Services/CopilotService.cs +++ b/PolyPilot/Services/CopilotService.cs @@ -445,6 +445,13 @@ private class SessionState /// WatchdogMaxToolAliveResets to prevent infinite resets when the session's JSON-RPC /// connection is dead but the shared persistent server is still alive. public int WatchdogCaseAResets; + /// Timer that fires shortly after a tool starts to verify the connection is still alive. + /// If no tool completion event arrives within ToolHealthCheckIntervalMs, we do an active health + /// check to detect dead connections early (instead of waiting for the 600s watchdog timeout). + public Timer? ToolHealthCheckTimer; + /// Timestamp when the most recent tool started. Used by the tool health check to + /// determine if a tool has been running too long without any events. + public long ToolStartedAtTicks; } private void Debug(string message) @@ -2636,6 +2643,12 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis // orphaned old state can't pass generation checks on the new state. Interlocked.Exchange(ref newState.ProcessingGeneration, Interlocked.Read(ref state.ProcessingGeneration)); + // Reset tool tracking for the NEW connection. The old connection's + // tool state is stale — no tools have run on this connection yet. + // Without this, HasUsedToolsThisTurn=true from the dead connection + // inflates the watchdog timeout from 120s to 600s, making stuck + // sessions wait 5x longer than necessary to recover. + newState.HasUsedToolsThisTurn = false; Interlocked.Exchange(ref newState.ActiveToolCallCount, 0); Interlocked.Exchange(ref newState.SuccessfulToolCountThisTurn, 0); newState.IsMultiAgentSession = state.IsMultiAgentSession; From 2e7a9083d1138f28d6e93619716ff9f00e342895 Mon Sep 17 00:00:00 2001 From: Shane Neuville Date: Tue, 10 Mar 2026 21:17:37 -0500 Subject: [PATCH 3/5] Fix multi-agent worker stuck sessions with 4 parallel improvements 1. Stagger worker dispatch: Add 1s delay between worker launches to prevent burst connection saturation (previously 3/5 workers crashed with IOException) 2. IOException retry: Wrap SendPromptAndWaitAsync in retry loop (max 2 attempts, 2s delay) using existing IsConnectionError() helper for resilience 3. Smart Case A watchdog: Replace fixed timeout and reset cap with events.jsonl mtime freshness check. Fresh file (<60s) = wait indefinitely (tool actively running), stale = 1 confirmation cycle then terminate. Protects active tools. 4. TurnEnd fallback re-arm: Add FallbackCanceledByTurnStart diagnostic flag so TurnEnd can log when re-arming the fallback timer after TurnStart canceled it Live test results: 5/5 workers connected (was 2/5), 4/5 returned real responses, full orchestration synthesis completed. Remaining empty response from worker-2 is caused by upstream SDK bug 299 (missing SessionIdleEvent). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- PolyPilot.Tests/ProcessingWatchdogTests.cs | 15 ++-- PolyPilot/Services/CopilotService.Events.cs | 68 ++++++++++++---- .../Services/CopilotService.Organization.cs | 80 ++++++++++++------- PolyPilot/Services/CopilotService.cs | 5 ++ 4 files changed, 116 insertions(+), 52 deletions(-) diff --git a/PolyPilot.Tests/ProcessingWatchdogTests.cs b/PolyPilot.Tests/ProcessingWatchdogTests.cs index eeb6538c0d..da216d4dd6 100644 --- a/PolyPilot.Tests/ProcessingWatchdogTests.cs +++ b/PolyPilot.Tests/ProcessingWatchdogTests.cs @@ -2457,9 +2457,10 @@ public void WatchdogMaxToolAliveResets_BoundsMaxStuckTime() [Fact] public void CaseA_ExceedingMaxResets_FallsThroughToKill_InSource() { - // Verify the watchdog's Case A checks WatchdogMaxToolAliveResets and falls - // through when exceeded. This is the core fix for the stuck-session bug where - // a dead session's tool appears active but no events ever arrive. + // Verify the watchdog's Case A uses events.jsonl freshness checks and falls + // through when the file is stale and the confirmation cycle is exceeded. + // This is the core fix for the stuck-session bug where a dead session's tool + // appears active but no events ever arrive. var source = File.ReadAllText( Path.Combine(GetRepoRoot(), "PolyPilot", "Services", "CopilotService.Events.cs")); var methodIdx = source.IndexOf("private async Task RunProcessingWatchdogAsync"); @@ -2467,10 +2468,10 @@ public void CaseA_ExceedingMaxResets_FallsThroughToKill_InSource() var endIdx = source.IndexOf(" private readonly ConcurrentDictionary", methodIdx); var watchdogBody = source.Substring(methodIdx, endIdx - methodIdx); - // Case A must reference WatchdogMaxToolAliveResets - Assert.True(watchdogBody.Contains("WatchdogMaxToolAliveResets"), - "Case A must check WatchdogMaxToolAliveResets to cap consecutive resets"); - // Case A must increment WatchdogCaseAResets + // Case A must check events.jsonl freshness + Assert.True(watchdogBody.Contains("events.jsonl"), + "Case A must check events.jsonl freshness to distinguish active tools from dead connections"); + // Case A must increment WatchdogCaseAResets for confirmation cycles Assert.True(watchdogBody.Contains("WatchdogCaseAResets"), "Case A must track reset count via state.WatchdogCaseAResets"); } diff --git a/PolyPilot/Services/CopilotService.Events.cs b/PolyPilot/Services/CopilotService.Events.cs index 98e3250d59..1fd6a773f9 100644 --- a/PolyPilot/Services/CopilotService.Events.cs +++ b/PolyPilot/Services/CopilotService.Events.cs @@ -449,6 +449,7 @@ void Invoke(Action action) case AssistantTurnStartEvent: // Cancel any pending TurnEnd→Idle fallback — another agent round is starting CancelTurnEndFallback(state); + state.FallbackCanceledByTurnStart = true; state.HasReceivedDeltasThisTurn = false; var phaseAdvancedToThinking = state.Info.ProcessingPhase < 2; if (phaseAdvancedToThinking) state.Info.ProcessingPhase = 2; // Thinking @@ -469,7 +470,14 @@ void Invoke(Action action) } // Schedule a delayed CompleteResponse in case SessionIdleEvent never arrives (SDK bug #299). // Cancelled by AssistantTurnStartEvent (another round starting) or SessionIdleEvent (normal path). + // If TurnStart previously canceled the fallback, this re-arms it — creating the + // self-healing loop: TurnEnd → TurnStart cancel → TurnEnd re-arm → fallback fires. { + if (state.FallbackCanceledByTurnStart) + { + Debug($"[TURNEND-FALLBACK] '{sessionName}' re-arming fallback timer (was canceled by TurnStart)"); + state.FallbackCanceledByTurnStart = false; + } var turnEndGen = Interlocked.Read(ref state.ProcessingGeneration); var idleFallbackCts = new CancellationTokenSource(); // Capture token BEFORE publishing so CancelTurnEndFallback on another thread @@ -869,6 +877,7 @@ private void CompleteResponse(SessionState state, long? expectedGeneration = nul CancelTurnEndFallback(state); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; + state.FallbackCanceledByTurnStart = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); state.Info.IsResumed = false; // Clear after first successful turn var response = state.CurrentResponse.ToString(); @@ -1735,27 +1744,58 @@ private async Task RunProcessingWatchdogAsync(SessionState state, string session { if (hasActiveTool && !IsDemoMode && !IsRemoteMode) { - // Case A: check server TCP port + // Case A: check server TCP port + events.jsonl freshness var serverAlive = _serverManager.IsServerRunning; if (serverAlive) { - var resets = Interlocked.Increment(ref state.WatchdogCaseAResets); - if (resets > WatchdogMaxToolAliveResets) + // Check if the CLI is actively writing events for this session. + // events.jsonl is written by the CLI process, not our app. + // If recently modified → tool is actively running → wait indefinitely. + // If stale → connection is likely dead → kill immediately. + var eventsFileActive = false; + try { - // Too many consecutive resets with no real SDK events — the - // session's JSON-RPC connection is likely dead even though the - // shared persistent server is still alive. Fall through to kill. - Debug($"[WATCHDOG] '{sessionName}' Case A reset cap exceeded ({resets}/{WatchdogMaxToolAliveResets}) " + - $"— killing despite server alive (elapsed={elapsed:F0}s, totalProcessing={totalProcessingSeconds:F0}s)"); + var sessionId = state.Info.SessionId; + if (!string.IsNullOrEmpty(sessionId)) + { + var eventsPath = Path.Combine(SessionStatePath, sessionId, "events.jsonl"); + if (File.Exists(eventsPath)) + { + var lastWrite = File.GetLastWriteTimeUtc(eventsPath); + var fileAge = (DateTime.UtcNow - lastWrite).TotalSeconds; + eventsFileActive = fileAge < 60; // modified within last 60s + } + } } - else + catch { /* filesystem errors → fall through to reset-cap logic */ } + + if (eventsFileActive) { - // Next check will use escalation timeout (60s) since we've had at least one reset - var nextTimeout = WatchdogToolEscalationTimeoutSeconds; - Debug($"[WATCHDOG] '{sessionName}' {elapsed:F0}s inactivity but tool is running and server is alive — resetting timer " + - $"(reset #{resets}/{WatchdogMaxToolAliveResets}, nextTimeout={nextTimeout}s, totalProcessing={totalProcessingSeconds:F0}s)"); + // Events file is fresh — tool is actively running. Wait indefinitely. + Debug($"[WATCHDOG] '{sessionName}' tool is running and events.jsonl is fresh — waiting indefinitely " + + $"(elapsed={elapsed:F0}s, totalProcessing={totalProcessingSeconds:F0}s)"); Interlocked.Exchange(ref state.LastEventAtTicks, DateTime.UtcNow.Ticks); - continue; // keep waiting — don't kill + Interlocked.Exchange(ref state.WatchdogCaseAResets, 0); // reset counter since tool is active + continue; + } + else + { + // Events file is stale or missing — connection is likely dead. + // Use the reset-cap as a safety buffer (1 more cycle to confirm). + var resets = Interlocked.Increment(ref state.WatchdogCaseAResets); + if (resets > 1) // Only need 1 confirmation cycle since we have file evidence + { + Debug($"[WATCHDOG] '{sessionName}' events.jsonl stale and reset count {resets} > 1 " + + $"— killing despite server alive (elapsed={elapsed:F0}s, totalProcessing={totalProcessingSeconds:F0}s)"); + // fall through to kill + } + else + { + Debug($"[WATCHDOG] '{sessionName}' events.jsonl stale but giving 1 more cycle " + + $"(reset #{resets}, elapsed={elapsed:F0}s, totalProcessing={totalProcessingSeconds:F0}s)"); + Interlocked.Exchange(ref state.LastEventAtTicks, DateTime.UtcNow.Ticks); + continue; + } } } else diff --git a/PolyPilot/Services/CopilotService.Organization.cs b/PolyPilot/Services/CopilotService.Organization.cs index 8b51abaaaf..93e0dd9fed 100644 --- a/PolyPilot/Services/CopilotService.Organization.cs +++ b/PolyPilot/Services/CopilotService.Organization.cs @@ -1151,8 +1151,14 @@ private async Task SendViaOrchestratorAsync(string groupId, List members { InvokeOnUI(() => OnOrchestratorPhaseChanged?.Invoke(groupId, OrchestratorPhase.WaitingForWorkers, null)); - var workerTasks = assignments.Select(a => - ExecuteWorkerAsync(a.WorkerName, a.Task, prompt, cancellationToken)); + Debug($"[DISPATCH] Staggering {assignments.Count} workers with 1s delay"); + var workerTasks = new List>(); + foreach (var a in assignments) + { + workerTasks.Add(ExecuteWorkerAsync(a.WorkerName, a.Task, prompt, cancellationToken)); + if (workerTasks.Count < assignments.Count) + await Task.Delay(1000, cancellationToken); + } var results = await Task.WhenAll(workerTasks); // Phase 4: Synthesize — send worker results back to orchestrator @@ -1360,43 +1366,55 @@ private async Task ExecuteWorkerAsync(string workerName, string ta var workerPrompt = $"{identity}{worktreeNote}\n\nYour response will be collected and synthesized with other workers' responses.\n\n{sharedPrefix}## Original User Request (context)\n{originalPrompt}\n\n## Your Assigned Task\n{task}"; - try + const int maxRetries = 2; + for (int attempt = 1; attempt <= maxRetries; attempt++) { - Debug($"[DISPATCH] Worker '{workerName}' starting (prompt len={workerPrompt.Length})"); - var response = await SendPromptAndWaitAsync(workerName, workerPrompt, cancellationToken, originalPrompt: originalPrompt); - - // Worker revival: empty response means the session died (e.g., dead SSE stream - // after reconnect). Create a fresh session and retry once. - if (string.IsNullOrWhiteSpace(response)) + try { - Debug($"[DISPATCH] Worker '{workerName}' returned empty — attempting fresh session revival"); - if (_sessions.TryGetValue(workerName, out var deadState)) - { - try { await deadState.Session.DisposeAsync(); } catch { } + Debug($"[DISPATCH] Worker '{workerName}' starting (prompt len={workerPrompt.Length}, attempt={attempt})"); + var response = await SendPromptAndWaitAsync(workerName, workerPrompt, cancellationToken, originalPrompt: originalPrompt); - var workerMeta = GetSessionMeta(workerName); - var client = GetClientForGroup(workerMeta?.GroupId); - if (client != null) + // Worker revival: empty response means the session died (e.g., dead SSE stream + // after reconnect). Create a fresh session and retry once. + if (string.IsNullOrWhiteSpace(response)) + { + Debug($"[DISPATCH] Worker '{workerName}' returned empty — attempting fresh session revival"); + if (_sessions.TryGetValue(workerName, out var deadState)) { - var freshConfig = BuildFreshSessionConfig(deadState); - var freshSession = await client.CreateSessionAsync(freshConfig, cancellationToken); - deadState.Info.SessionId = freshSession.SessionId; - var freshState = new SessionState { Session = freshSession, Info = deadState.Info }; - _sessions[workerName] = freshState; - Debug($"[DISPATCH] Worker '{workerName}' revived with fresh session '{freshSession.SessionId}'"); - response = await SendPromptAndWaitAsync(workerName, workerPrompt, cancellationToken, originalPrompt: originalPrompt); + try { await deadState.Session.DisposeAsync(); } catch { } + + var workerMeta = GetSessionMeta(workerName); + var client = GetClientForGroup(workerMeta?.GroupId); + if (client != null) + { + var freshConfig = BuildFreshSessionConfig(deadState); + var freshSession = await client.CreateSessionAsync(freshConfig, cancellationToken); + deadState.Info.SessionId = freshSession.SessionId; + var freshState = new SessionState { Session = freshSession, Info = deadState.Info }; + _sessions[workerName] = freshState; + Debug($"[DISPATCH] Worker '{workerName}' revived with fresh session '{freshSession.SessionId}'"); + response = await SendPromptAndWaitAsync(workerName, workerPrompt, cancellationToken, originalPrompt: originalPrompt); + } } } - } - Debug($"[DISPATCH] Worker '{workerName}' completed (response len={response?.Length ?? 0}, elapsed={sw.Elapsed.TotalSeconds:F1}s)"); - return new WorkerResult(workerName, response, true, null, sw.Elapsed); - } - catch (Exception ex) - { - Debug($"[DISPATCH] Worker '{workerName}' FAILED: {ex.GetType().Name}: {ex.Message} (elapsed={sw.Elapsed.TotalSeconds:F1}s)"); - return new WorkerResult(workerName, null, false, ex.Message, sw.Elapsed); + Debug($"[DISPATCH] Worker '{workerName}' completed (response len={response?.Length ?? 0}, elapsed={sw.Elapsed.TotalSeconds:F1}s)"); + return new WorkerResult(workerName, response, true, null, sw.Elapsed); + } + catch (Exception ex) when (attempt < maxRetries && IsConnectionError(ex)) + { + Debug($"[DISPATCH] Worker '{workerName}' attempt {attempt} failed with {ex.GetType().Name} — retrying in 2s"); + await Task.Delay(2000, cancellationToken); + continue; + } + catch (Exception ex) + { + Debug($"[DISPATCH] Worker '{workerName}' FAILED: {ex.GetType().Name}: {ex.Message} (elapsed={sw.Elapsed.TotalSeconds:F1}s)"); + return new WorkerResult(workerName, null, false, ex.Message, sw.Elapsed); + } } + // Should never reach here, but just in case + return new WorkerResult(workerName, null, false, "Max retries exceeded", sw.Elapsed); } private async Task SendPromptAndWaitAsync(string sessionName, string prompt, CancellationToken cancellationToken, string? originalPrompt = null) diff --git a/PolyPilot/Services/CopilotService.cs b/PolyPilot/Services/CopilotService.cs index f6d05872f5..ba95d7a51d 100644 --- a/PolyPilot/Services/CopilotService.cs +++ b/PolyPilot/Services/CopilotService.cs @@ -445,6 +445,10 @@ private class SessionState /// WatchdogMaxToolAliveResets to prevent infinite resets when the session's JSON-RPC /// connection is dead but the shared persistent server is still alive. public int WatchdogCaseAResets; + /// True if the TurnEnd→Idle fallback was canceled by an AssistantTurnStartEvent. + /// Used for diagnostic logging: when the next TurnEnd re-arms the fallback, the log shows + /// the self-healing loop in action (TurnEnd → TurnStart cancel → TurnEnd re-arm). + public volatile bool FallbackCanceledByTurnStart; /// Timer that fires shortly after a tool starts to verify the connection is still alive. /// If no tool completion event arrives within ToolHealthCheckIntervalMs, we do an active health /// check to detect dead connections early (instead of waiting for the 600s watchdog timeout). @@ -2420,6 +2424,7 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis state.Info.ClearPermissionDenials(); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); // Reset stale tool count from previous turn state.HasUsedToolsThisTurn = false; // Reset stale tool flag from previous turn + state.FallbackCanceledByTurnStart = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); // Cancel any pending TurnEnd→Idle fallback from the previous turn CancelTurnEndFallback(state); From e30946777e787274c8c1bcbc89e2f6feec29c02c Mon Sep 17 00:00:00 2001 From: Shane Neuville Date: Tue, 10 Mar 2026 21:36:57 -0500 Subject: [PATCH 4/5] Increase WorkerExecutionTimeout from 10 to 60 minutes The smart Case A watchdog (events.jsonl freshness) now handles dead session detection in ~90 seconds. The WorkerExecutionTimeout only needs to be an absolute backstop. At 10 minutes, it was prematurely terminating workers with 200+ tool calls that were actively processing (e.g. long PR reviews). This caused cascading failures: the session stayed in IsProcessing=true from the SDK side, so re-dispatch attempts got 'Session is already processing a request' errors. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- PolyPilot/Services/CopilotService.Organization.cs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/PolyPilot/Services/CopilotService.Organization.cs b/PolyPilot/Services/CopilotService.Organization.cs index 93e0dd9fed..dce003fc16 100644 --- a/PolyPilot/Services/CopilotService.Organization.cs +++ b/PolyPilot/Services/CopilotService.Organization.cs @@ -29,8 +29,10 @@ public partial class CopilotService { public event Action? OnOrchestratorPhaseChanged; // groupId, phase, detail - /// Maximum time a single worker is allowed to run before being cancelled. - private static readonly TimeSpan WorkerExecutionTimeout = TimeSpan.FromMinutes(10); + /// Maximum time a single worker is allowed to run before being cancelled. + /// Set high (60 min) because the smart watchdog (events.jsonl freshness) handles dead + /// session detection in ~90s. This is only an absolute backstop. + private static readonly TimeSpan WorkerExecutionTimeout = TimeSpan.FromMinutes(60); // Per-session semaphores to prevent concurrent model switches during rapid dispatch private readonly ConcurrentDictionary _modelSwitchLocks = new(); From eedf8e5946436d4310e2da187ca1730d1b627c55 Mon Sep 17 00:00:00 2001 From: Shane Neuville Date: Tue, 10 Mar 2026 23:33:45 -0500 Subject: [PATCH 5/5] Address PR #302 review: fix 3 CRITICALs and 3 MODERATEs CRITICAL fixes: - TriggerToolHealthRecovery: complete INV-1 cleanup (was missing 8+ fields, OnSessionComplete, CancelProcessingWatchdog/TurnEndFallback, proper FlushedResponse+CurrentResponse for TCS result) - MonitorAndSynthesizeAsync: wrap IsProcessing mutation in InvokeOnUI with TCS synchronization, add ResponseCompletion.TrySetResult, complete all companion field cleanup - CancelToolHealthCheck added to all 14 cleanup paths (AbortSessionAsync, ReconnectAsync, DisposeAsync, CloseSessionCoreAsync, CompleteResponse, watchdog timeouts) MODERATE fixes: - Separate ToolHealthStaleChecks counter from WatchdogCaseAResets to prevent cross-system interference - WorkerExecutionTimeoutRemote (10 min) for remote/demo mode where smart watchdog is unavailable; keep 60 min for normal mode - Fix ScheduleToolHealthCheck timer/cancel race: create dormant, store via Interlocked.Exchange, then start with timer.Change() MINOR: Replace unreachable code after retry loop with UnreachableException Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Services/CopilotService.Codespace.cs | 1 + PolyPilot/Services/CopilotService.Events.cs | 56 +++++++++++++---- .../Services/CopilotService.Organization.cs | 61 ++++++++++++++++--- PolyPilot/Services/CopilotService.cs | 9 +++ 4 files changed, 109 insertions(+), 18 deletions(-) diff --git a/PolyPilot/Services/CopilotService.Codespace.cs b/PolyPilot/Services/CopilotService.Codespace.cs index 943535958e..69e5288a22 100644 --- a/PolyPilot/Services/CopilotService.Codespace.cs +++ b/PolyPilot/Services/CopilotService.Codespace.cs @@ -788,6 +788,7 @@ private async Task ResumeCodespaceSessionsAsync(SessionGroup group, Cancellation state.Info.WorkingDirectory = codespaceWorkDir; CancelProcessingWatchdog(state); + CancelToolHealthCheck(state); var newState = new SessionState { Session = newSession, diff --git a/PolyPilot/Services/CopilotService.Events.cs b/PolyPilot/Services/CopilotService.Events.cs index 1fd6a773f9..bde7ecaf91 100644 --- a/PolyPilot/Services/CopilotService.Events.cs +++ b/PolyPilot/Services/CopilotService.Events.cs @@ -664,6 +664,7 @@ await notifService.SendNotificationAsync( var errMsg = Models.ErrorMessageHelper.HumanizeMessage(err.Data?.Message ?? "Unknown error"); CancelProcessingWatchdog(state); CancelTurnEndFallback(state); + CancelToolHealthCheck(state); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); @@ -875,10 +876,12 @@ private void CompleteResponse(SessionState state, long? expectedGeneration = nul CancelProcessingWatchdog(state); // Also cancel any pending TurnEnd→Idle fallback — CompleteResponse is now executing CancelTurnEndFallback(state); + CancelToolHealthCheck(state); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; state.FallbackCanceledByTurnStart = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); + Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0); state.Info.IsResumed = false; // Clear after first successful turn var response = state.CurrentResponse.ToString(); if (!string.IsNullOrWhiteSpace(response)) @@ -1480,9 +1483,6 @@ private static void CancelToolHealthCheck(SessionState state) /// private void ScheduleToolHealthCheck(SessionState state, string sessionName) { - // Cancel any previous health check timer - CancelToolHealthCheck(state); - // Skip in demo/remote mode where we can't probe the local server if (IsDemoMode || IsRemoteMode) return; @@ -1507,6 +1507,7 @@ private void ScheduleToolHealthCheck(SessionState state, string sessionName) { // Events are still flowing - tool is legitimately running // Schedule another check + Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0); Debug($"[TOOL-HEALTH] '{sessionName}' events flowing — rescheduling health check"); ScheduleToolHealthCheck(state, sessionName); return; @@ -1523,7 +1524,7 @@ private void ScheduleToolHealthCheck(SessionState state, string sessionName) // Server TCP port is alive, but no events for 30s. The connection might be dead. // Increment the stale check counter and check if we should recover. - var staleChecks = Interlocked.Increment(ref state.WatchdogCaseAResets); + var staleChecks = Interlocked.Increment(ref state.ToolHealthStaleChecks); if (staleChecks > WatchdogMaxToolAliveResets) { Debug($"[TOOL-HEALTH] '{sessionName}' {staleChecks} stale health checks — assuming dead connection"); @@ -1539,9 +1540,11 @@ private void ScheduleToolHealthCheck(SessionState state, string sessionName) { Debug($"[TOOL-HEALTH] '{sessionName}' check failed: {ex.Message}"); } - }, null, ToolHealthCheckIntervalMs, Timeout.Infinite); + }, null, Timeout.Infinite, Timeout.Infinite); // Don't start yet — store first to avoid race - Interlocked.Exchange(ref state.ToolHealthCheckTimer, timer); + var prev = Interlocked.Exchange(ref state.ToolHealthCheckTimer, timer); + prev?.Dispose(); + timer.Change(ToolHealthCheckIntervalMs, Timeout.Infinite); // Now start } /// @@ -1551,6 +1554,8 @@ private void ScheduleToolHealthCheck(SessionState state, string sessionName) private void TriggerToolHealthRecovery(SessionState state, string sessionName, string reason) { CancelToolHealthCheck(state); + CancelProcessingWatchdog(state); + CancelTurnEndFallback(state); var activeTools = Volatile.Read(ref state.ActiveToolCallCount); Debug($"[TOOL-HEALTH] '{sessionName}' triggering recovery: {reason} (activeTools={activeTools})"); @@ -1561,13 +1566,40 @@ private void TriggerToolHealthRecovery(SessionState state, string sessionName, s OnError?.Invoke(sessionName, $"Tool execution stuck ({reason}). Session recovered automatically."); - // Clear the stuck state - state.Info.IsProcessing = false; + // Full cleanup mirroring CompleteResponse — missing fields here caused stuck sessions Interlocked.Exchange(ref state.ActiveToolCallCount, 0); - Interlocked.Exchange(ref state.SendingFlag, 0); state.HasUsedToolsThisTurn = false; - state.ResponseCompletion?.TrySetResult(state.CurrentResponse.ToString()); - FlushCurrentResponse(state); + state.FallbackCanceledByTurnStart = false; + Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); + Interlocked.Exchange(ref state.WatchdogCaseAResets, 0); + Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0); + + // Build full response: flushed mid-turn text + remaining current text + var response = state.CurrentResponse.ToString(); + var fullResponse = state.FlushedResponse.Length > 0 + ? (string.IsNullOrEmpty(response) + ? state.FlushedResponse.ToString() + : state.FlushedResponse + "\n\n" + response) + : response; + + state.CurrentResponse.Clear(); + state.FlushedResponse.Clear(); + state.PendingReasoningMessages.Clear(); + + state.Info.IsProcessing = false; + state.Info.IsResumed = false; + Interlocked.Exchange(ref state.SendingFlag, 0); + state.Info.ProcessingStartedAt = null; + state.Info.ToolCallCount = 0; + state.Info.ProcessingPhase = 0; + state.Info.ClearPermissionDenials(); + + state.ResponseCompletion?.TrySetResult(fullResponse); + + Debug($"[TOOL-HEALTH-COMPLETE] '{sessionName}' recovery finished (responseLen={fullResponse.Length})"); + + var summary = fullResponse.Length > 0 ? (fullResponse.Length > 100 ? fullResponse[..100] + "..." : fullResponse) : ""; + OnSessionComplete?.Invoke(sessionName, summary); OnStateChanged?.Invoke(); }); } @@ -1851,6 +1883,7 @@ private async Task RunProcessingWatchdogAsync(SessionState state, string session return; } CancelProcessingWatchdog(state); + CancelToolHealthCheck(state); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); @@ -1988,6 +2021,7 @@ private async Task TryRecoverPermissionAsync(SessionState state, string sessionN // Cancel old watchdog AND TurnEnd fallback BEFORE creating new state CancelProcessingWatchdog(state); CancelTurnEndFallback(state); + CancelToolHealthCheck(state); // Bug B fix: Cancel the old ResponseCompletion TCS so the original // SendPromptAsync awaiter doesn't hang forever. diff --git a/PolyPilot/Services/CopilotService.Organization.cs b/PolyPilot/Services/CopilotService.Organization.cs index dce003fc16..86f313047e 100644 --- a/PolyPilot/Services/CopilotService.Organization.cs +++ b/PolyPilot/Services/CopilotService.Organization.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using System.Diagnostics; using System.Text.Json; using System.Text.RegularExpressions; using PolyPilot.Models; @@ -33,6 +34,7 @@ public partial class CopilotService /// Set high (60 min) because the smart watchdog (events.jsonl freshness) handles dead /// session detection in ~90s. This is only an absolute backstop. private static readonly TimeSpan WorkerExecutionTimeout = TimeSpan.FromMinutes(60); + private static readonly TimeSpan WorkerExecutionTimeoutRemote = TimeSpan.FromMinutes(10); // Per-session semaphores to prevent concurrent model switches during rapid dispatch private readonly ConcurrentDictionary _modelSwitchLocks = new(); @@ -1415,8 +1417,7 @@ private async Task ExecuteWorkerAsync(string workerName, string ta return new WorkerResult(workerName, null, false, ex.Message, sw.Elapsed); } } - // Should never reach here, but just in case - return new WorkerResult(workerName, null, false, "Max retries exceeded", sw.Elapsed); + throw new UnreachableException(); // for loop always returns or continues } private async Task SendPromptAndWaitAsync(string sessionName, string prompt, CancellationToken cancellationToken, string? originalPrompt = null) @@ -1425,7 +1426,7 @@ private async Task SendPromptAndWaitAsync(string sessionName, string pro // Do NOT capture state and await its TCS separately: reconnection replaces the state // object, orphaning the old TCS and causing a 10-minute hang. using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - cts.CancelAfter(WorkerExecutionTimeout); + cts.CancelAfter(IsDemoMode || IsRemoteMode ? WorkerExecutionTimeoutRemote : WorkerExecutionTimeout); // Wire CTS to ResponseCompletion TCS so the 10-minute timeout actually cancels the await. // Must look up from _sessions dict (not captured ref) since reconnect replaces state. await using var ctsReg = cts.Token.Register(() => @@ -1671,10 +1672,56 @@ private async Task MonitorAndSynthesizeAsync(PendingOrchestration pending, Cance if (_sessions.TryGetValue(workerName, out var workerState) && workerState.Info.IsProcessing) { Debug($"[DISPATCH] Clearing stuck IsProcessing on '{workerName}' before re-dispatch"); - workerState.Info.IsProcessing = false; - Interlocked.Exchange(ref workerState.ActiveToolCallCount, 0); - Interlocked.Exchange(ref workerState.SendingFlag, 0); - workerState.HasUsedToolsThisTurn = false; + // Cancel timers first (thread-safe — use Interlocked internally) + CancelProcessingWatchdog(workerState); + CancelTurnEndFallback(workerState); + CancelToolHealthCheck(workerState); + + // Must run on UI thread per INV-2; use TCS to synchronize + var tcs = new TaskCompletionSource(); + InvokeOnUI(() => + { + try + { + if (!workerState.Info.IsProcessing) { tcs.TrySetResult(true); return; } + + // Full cleanup mirroring CompleteResponse / tool-health recovery + Interlocked.Exchange(ref workerState.ActiveToolCallCount, 0); + Interlocked.Exchange(ref workerState.SendingFlag, 0); + Interlocked.Exchange(ref workerState.SuccessfulToolCountThisTurn, 0); + Interlocked.Exchange(ref workerState.WatchdogCaseAResets, 0); + workerState.HasUsedToolsThisTurn = false; + workerState.FallbackCanceledByTurnStart = false; + workerState.Info.IsResumed = false; + workerState.Info.ProcessingStartedAt = null; + workerState.Info.ToolCallCount = 0; + workerState.Info.ProcessingPhase = 0; + workerState.Info.ClearPermissionDenials(); + + var response = workerState.CurrentResponse.ToString(); + var fullResponse = workerState.FlushedResponse.Length > 0 + ? (string.IsNullOrEmpty(response) + ? workerState.FlushedResponse.ToString() + : workerState.FlushedResponse + "\n\n" + response) + : response; + + workerState.CurrentResponse.Clear(); + workerState.FlushedResponse.Clear(); + workerState.PendingReasoningMessages.Clear(); + workerState.Info.IsProcessing = false; + + workerState.ResponseCompletion?.TrySetResult(fullResponse); + var summary = fullResponse.Length > 0 ? (fullResponse.Length > 100 ? fullResponse[..100] + "..." : fullResponse) : ""; + OnSessionComplete?.Invoke(workerName, summary); + OnStateChanged?.Invoke(); + tcs.TrySetResult(true); + } + catch (Exception ex) + { + tcs.TrySetException(ex); + } + }); + await tcs.Task; } } diff --git a/PolyPilot/Services/CopilotService.cs b/PolyPilot/Services/CopilotService.cs index ba95d7a51d..9bdb631d78 100644 --- a/PolyPilot/Services/CopilotService.cs +++ b/PolyPilot/Services/CopilotService.cs @@ -453,6 +453,7 @@ private class SessionState /// If no tool completion event arrives within ToolHealthCheckIntervalMs, we do an active health /// check to detect dead connections early (instead of waiting for the 600s watchdog timeout). public Timer? ToolHealthCheckTimer; + public int ToolHealthStaleChecks; // Separate from WatchdogCaseAResets — health check's own stale counter /// Timestamp when the most recent tool started. Used by the tool health check to /// determine if a tool has been running too long without any events. public long ToolStartedAtTicks; @@ -845,6 +846,7 @@ public async Task ReconnectAsync(ConnectionSettings settings, CancellationToken { CancelProcessingWatchdog(state); CancelTurnEndFallback(state); + CancelToolHealthCheck(state); try { if (state.Session != null) await state.Session.DisposeAsync(); } catch { } } _sessions.Clear(); @@ -2636,6 +2638,7 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis // Cancel old watchdog AND TurnEnd fallback BEFORE creating new state — they share Info/TCS CancelProcessingWatchdog(state); CancelTurnEndFallback(state); + CancelToolHealthCheck(state); Debug($"[RECONNECT] '{sessionName}' replacing state (old handler will be orphaned, " + $"old session disposed, new session={newSession.SessionId})"); var newState = new SessionState @@ -2700,6 +2703,7 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis OnError?.Invoke(sessionName, $"Session disconnected and reconnect failed: {Models.ErrorMessageHelper.Humanize(retryEx)}"); CancelProcessingWatchdog(state); CancelTurnEndFallback(state); + CancelToolHealthCheck(state); FlushCurrentResponse(state); Debug($"[ERROR] '{sessionName}' reconnect+retry failed, clearing IsProcessing"); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); @@ -2721,6 +2725,7 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis OnError?.Invoke(sessionName, $"SendAsync failed: {Models.ErrorMessageHelper.Humanize(ex)}"); CancelProcessingWatchdog(state); CancelTurnEndFallback(state); + CancelToolHealthCheck(state); FlushCurrentResponse(state); Debug($"[ERROR] '{sessionName}' SendAsync failed, clearing IsProcessing (error={ex.Message})"); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); @@ -2884,6 +2889,7 @@ public async Task AbortSessionAsync(string sessionName, bool markAsInterrupted = // Cancel any pending TurnEnd→Idle fallback so it doesn't fire CompleteResponse after abort CancelTurnEndFallback(state); CancelProcessingWatchdog(state); + CancelToolHealthCheck(state); state.FlushedResponse.Clear(); state.PendingReasoningMessages.Clear(); state.Info.ClearPermissionDenials(); // INV-1: clear on all termination paths @@ -2973,6 +2979,7 @@ await InvokeOnUIAsync(() => OnError?.Invoke(sessionName, $"Soft steer failed: {Models.ErrorMessageHelper.Humanize(ex)}"); CancelProcessingWatchdog(state); CancelTurnEndFallback(state); + CancelToolHealthCheck(state); FlushCurrentResponse(state); Debug($"[STEER-ERROR] '{sessionName}' soft steer SendAsync failed, clearing IsProcessing (error={ex.Message})"); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); @@ -3426,6 +3433,7 @@ internal async Task CloseSessionCoreAsync(string name, bool notifyUi) // Cancel any pending timers so they don't fire on torn-down state after session removal CancelProcessingWatchdog(state); CancelTurnEndFallback(state); + CancelToolHealthCheck(state); // Dispose the SDK session AFTER UI has updated — DisposeAsync talks to the CLI // process and may trigger additional SDK events on background threads. Running it @@ -3476,6 +3484,7 @@ public async ValueTask DisposeAsync() { CancelProcessingWatchdog(state); CancelTurnEndFallback(state); + CancelToolHealthCheck(state); if (state.Session is not null) try { await state.Session.DisposeAsync(); } catch { } }