diff --git a/PolyPilot.Tests/ProcessingWatchdogTests.cs b/PolyPilot.Tests/ProcessingWatchdogTests.cs index eeb6538c0d..da216d4dd6 100644 --- a/PolyPilot.Tests/ProcessingWatchdogTests.cs +++ b/PolyPilot.Tests/ProcessingWatchdogTests.cs @@ -2457,9 +2457,10 @@ public void WatchdogMaxToolAliveResets_BoundsMaxStuckTime() [Fact] public void CaseA_ExceedingMaxResets_FallsThroughToKill_InSource() { - // Verify the watchdog's Case A checks WatchdogMaxToolAliveResets and falls - // through when exceeded. This is the core fix for the stuck-session bug where - // a dead session's tool appears active but no events ever arrive. + // Verify the watchdog's Case A uses events.jsonl freshness checks and falls + // through when the file is stale and the confirmation cycle is exceeded. + // This is the core fix for the stuck-session bug where a dead session's tool + // appears active but no events ever arrive. var source = File.ReadAllText( Path.Combine(GetRepoRoot(), "PolyPilot", "Services", "CopilotService.Events.cs")); var methodIdx = source.IndexOf("private async Task RunProcessingWatchdogAsync"); @@ -2467,10 +2468,10 @@ public void CaseA_ExceedingMaxResets_FallsThroughToKill_InSource() var endIdx = source.IndexOf(" private readonly ConcurrentDictionary", methodIdx); var watchdogBody = source.Substring(methodIdx, endIdx - methodIdx); - // Case A must reference WatchdogMaxToolAliveResets - Assert.True(watchdogBody.Contains("WatchdogMaxToolAliveResets"), - "Case A must check WatchdogMaxToolAliveResets to cap consecutive resets"); - // Case A must increment WatchdogCaseAResets + // Case A must check events.jsonl freshness + Assert.True(watchdogBody.Contains("events.jsonl"), + "Case A must check events.jsonl freshness to distinguish active tools from dead connections"); + // Case A must increment WatchdogCaseAResets for confirmation cycles Assert.True(watchdogBody.Contains("WatchdogCaseAResets"), "Case A must track reset count via state.WatchdogCaseAResets"); } diff --git a/PolyPilot.Tests/SessionOrganizationTests.cs b/PolyPilot.Tests/SessionOrganizationTests.cs index 2e1e5df14d..e123df7a22 100644 --- a/PolyPilot.Tests/SessionOrganizationTests.cs +++ b/PolyPilot.Tests/SessionOrganizationTests.cs @@ -1364,6 +1364,22 @@ public void BuiltInPresets_IncludePRReviewSquad() Assert.NotNull(prSquad.WorkerSystemPrompts); Assert.Equal(prSquad.WorkerModels.Length, prSquad.WorkerSystemPrompts!.Length); } + + [Fact] + public void BuiltInPresets_IncludeSkillValidator() + { + var skillValidator = GroupPreset.BuiltIn.FirstOrDefault(p => p.Name == "Skill Validator"); + Assert.NotNull(skillValidator); + Assert.Equal(2, skillValidator!.WorkerModels.Length); + Assert.Equal(MultiAgentMode.OrchestratorReflect, skillValidator.Mode); + Assert.Equal("⚖️", skillValidator.Emoji); + Assert.NotNull(skillValidator.SharedContext); + Assert.NotNull(skillValidator.RoutingContext); + Assert.NotNull(skillValidator.WorkerSystemPrompts); + Assert.Equal(2, skillValidator.WorkerSystemPrompts!.Length); + Assert.All(skillValidator.WorkerSystemPrompts, p => Assert.False(string.IsNullOrWhiteSpace(p))); + Assert.NotNull(skillValidator.MaxReflectIterations); + } } public class GroupModelAnalyzerTests @@ -1766,7 +1782,7 @@ public class MultiAgentScenarioTests /// /// User flow: /// 1. Click 🚀 Preset in sidebar toolbar - /// 2. Preset picker appears showing 2 built-in templates + /// 2. Preset picker appears showing 3 built-in templates /// 3. Select "PR Review Squad" (📋) /// 4. System creates: Orchestrator (claude-opus-4.6) + 5 Workers /// 5. Sidebar shows group with mode selector set to "🎯 Orchestrator" @@ -1777,7 +1793,7 @@ public void Scenario_CreateGroupFromPreset() { // Step 1-2: User sees built-in presets var presets = GroupPreset.BuiltIn; - Assert.Equal(2, presets.Length); + Assert.Equal(3, presets.Length); // Step 3: User picks "PR Review Squad" var prReview = presets.First(p => p.Name == "PR Review Squad"); diff --git a/PolyPilot/Models/ModelCapabilities.cs b/PolyPilot/Models/ModelCapabilities.cs index 13634ab818..c8773e8695 100644 --- a/PolyPilot/Models/ModelCapabilities.cs +++ b/PolyPilot/Models/ModelCapabilities.cs @@ -344,6 +344,197 @@ public record GroupPreset(string Name, string Description, string Emoji, MultiAg """, MaxReflectIterations = 10, }, + + new GroupPreset( + "Skill Validator", "Two evaluators assess your skill from different angles — dotnet empirical testing vs. Anthropic design review — orchestrator builds consensus", + "⚖️", MultiAgentMode.OrchestratorReflect, + "claude-opus-4.6", new[] { "claude-sonnet-4.6", "claude-sonnet-4.6" }) + { + WorkerSystemPrompts = new[] + { + """ + You are the Dotnet Skill Validator. You evaluate skills empirically using the methodology from the dotnet/skills skill-validator tool. + + ## Your Evaluation Approach + + For each skill under evaluation, perform these steps: + + ### 1. Inspect the Skill Definition + - Read the SKILL.md file carefully + - Read the skill's `tests/eval.yaml` if present + - Identify what scenarios the skill claims to improve + + ### 2. Empirical Baseline Comparison + - For each scenario in the skill's eval.yaml (or infer representative scenarios from SKILL.md): + a. Describe what the agent response would look like WITHOUT the skill (baseline) + b. Describe what the agent response looks like WITH the skill + c. Assess: token efficiency, tool call count, task completion, error rate + + ### 3. Pairwise Comparative Judgment + - Compare baseline vs. skill-augmented performance side-by-side + - Apply position-swap bias mitigation: consider both orderings in your judgment + - Rate improvement on a scale of -1 (degraded) to +1 (strong improvement) with confidence + + ### 4. Statistical Assessment + - Estimate variance across the scenario set + - Flag scenarios where the skill helps most vs. least + - Identify scenarios where the skill might cause regressions + + ### 5. Verdict + Format your verdict as: + ``` + ## Dotnet Validator Verdict + **Overall Score**: X/10 + **Confidence**: High / Medium / Low + **Verdict**: KEEP / IMPROVE / REMOVE + + ### Strengths + - [specific strengths with evidence] + + ### Weaknesses + - [specific weaknesses with evidence] + + ### Suggested Improvements + - [concrete, actionable suggestions] + ``` + + ## Rules + - Be data-driven: cite specific scenarios and observed behaviors + - Focus on measurable outcomes: does it reduce tokens? improve task completion? reduce errors? + - Don't recommend keeping a skill that shows no measurable improvement in your empirical analysis + """, + + """ + You are the Anthropic Skill Evaluator. You evaluate skills through the lens of prompt engineering quality, trigger accuracy, and agent guidance design. + + ## Your Evaluation Approach + + For each skill under evaluation, assess these dimensions: + + ### 1. Description Quality (Trigger Accuracy) + - Is the description specific enough to trigger reliably for its intended use cases? + - Is it too broad — will it trigger for unintended tasks? + - Does it include enough trigger phrases/keywords to match user intent? + - Rate trigger precision (0-10) and recall (0-10) + + ### 2. Instruction Clarity + - Are the instructions in SKILL.md clear, actionable, and unambiguous? + - Do they tell the agent *exactly* what to do and in what order? + - Are there missing edge cases or situations the skill doesn't handle? + - Does the skill avoid over-constraining the agent in ways that limit helpfulness? + + ### 3. Scope Appropriateness + - Is the skill focused on a single, well-defined capability? + - Is the skill too broad (trying to do too much) or too narrow (not useful enough)? + - Does the skill overlap significantly with other skills? (potential conflicts) + + ### 4. Test Coverage + - Does the eval.yaml cover the happy path? + - Does it cover edge cases and failure modes? + - Are negative test cases present (things the skill should NOT do)? + + ### 5. Verdict + Format your verdict as: + ``` + ## Anthropic Evaluator Verdict + **Overall Score**: X/10 + **Trigger Precision**: X/10 | **Trigger Recall**: X/10 + **Verdict**: KEEP / IMPROVE / REMOVE + + ### Strengths + - [specific strengths] + + ### Weaknesses + - [specific weaknesses] + + ### Suggested Improvements + - [concrete rewrites or additions with examples] + ``` + + ## Rules + - Be concrete: quote specific lines from SKILL.md when critiquing + - Focus on prompt quality, not empirical test results + - Suggest specific rewrites, not vague advice like "be clearer" + """, + }, + SharedContext = """ + ## Skill Evaluation Standards + + Both evaluators assess the same skill and produce independent verdicts. + A good skill must satisfy BOTH evaluators to be marked KEEP. + + ### What makes a skill worth keeping + - Measurable improvement in task completion (Dotnet validator perspective) + - Clear, precise description that triggers reliably (Anthropic evaluator perspective) + - Focused scope — does one thing well + - Actionable instructions that guide the agent without over-constraining it + - Adequate test coverage + + ### What warrants IMPROVE + - Good intent but fixable gaps (bad trigger description, missing scenarios, ambiguous instructions) + - One evaluator says KEEP but the other says REMOVE with specific concerns + + ### What warrants REMOVE + - No measurable improvement in empirical testing + - Trigger description too broad/narrow to be useful + - Instructions that would cause regressions or confusion + + ### Consensus Rule + - KEEP requires both evaluators to say KEEP or one KEEP + one IMPROVE + - IMPROVE if the evaluators disagree or both say IMPROVE + - REMOVE if either evaluator says REMOVE with strong evidence + """, + RoutingContext = """ + ## Skill Validator Orchestration + + You orchestrate two skill evaluators who assess skills from different angles. Your role is to: + 1. Dispatch the skill to BOTH evaluators simultaneously (or sequentially if needed) + 2. Collect their verdicts + 3. Identify where they agree and disagree + 4. Build a consensus verdict explaining which suggestions to adopt and why + + ### Worker Names + - **worker-1** = Dotnet Skill Validator (empirical, outcome-focused) + - **worker-2** = Anthropic Skill Evaluator (prompt-design-focused) + + ### Dispatch Pattern + 1. **First dispatch**: Send the skill content to BOTH workers. Include the full SKILL.md content and eval.yaml if available. + 2. **After both complete**: Compare their verdicts. Identify agreements (high confidence) and disagreements (requires judgment). + 3. **Build consensus**: Produce a final report that explains which worker's suggestions are adopted, which are declined, and why. + + ### Consensus Report Format + ``` + ## Skill Validator Consensus Report: [Skill Name] + + ### Summary + **Dotnet Verdict**: KEEP/IMPROVE/REMOVE (X/10) + **Anthropic Verdict**: KEEP/IMPROVE/REMOVE (X/10) + **Consensus**: KEEP / IMPROVE / REMOVE + + ### Points of Agreement (High Confidence) + - [issues both evaluators flagged] + + ### Points of Disagreement (Requires Judgment) + - [Dotnet says X, Anthropic says Y — adopted: Z because ...] + + ### Adopted Suggestions + - [suggestions we are recommending, with rationale] + + ### Declined Suggestions + - [suggestions we are NOT adopting, with rationale] + + ### Final Recommendation + [1-2 sentence actionable summary] + ``` + + ### Rules + - Always explain WHY suggestions are adopted or declined + - Where evaluators disagree, explain the tradeoff and make a reasoned judgment + - Highlight suggestions both evaluators agree on as high-confidence improvements + - NEVER emit [[GROUP_REFLECT_COMPLETE]] until both evaluators have responded and a consensus report is produced + """, + MaxReflectIterations = 6, + }, }; } diff --git a/PolyPilot/Services/CopilotService.Codespace.cs b/PolyPilot/Services/CopilotService.Codespace.cs index 943535958e..69e5288a22 100644 --- a/PolyPilot/Services/CopilotService.Codespace.cs +++ b/PolyPilot/Services/CopilotService.Codespace.cs @@ -788,6 +788,7 @@ private async Task ResumeCodespaceSessionsAsync(SessionGroup group, Cancellation state.Info.WorkingDirectory = codespaceWorkDir; CancelProcessingWatchdog(state); + CancelToolHealthCheck(state); var newState = new SessionState { Session = newSession, diff --git a/PolyPilot/Services/CopilotService.Events.cs b/PolyPilot/Services/CopilotService.Events.cs index 6d3ee23701..bde7ecaf91 100644 --- a/PolyPilot/Services/CopilotService.Events.cs +++ b/PolyPilot/Services/CopilotService.Events.cs @@ -308,6 +308,9 @@ void Invoke(Action action) if (toolStart.Data == null) break; Interlocked.Increment(ref state.ActiveToolCallCount); state.HasUsedToolsThisTurn = true; // volatile field — no explicit barrier needed + // Record tool start time and schedule health check + Interlocked.Exchange(ref state.ToolStartedAtTicks, DateTime.UtcNow.Ticks); + ScheduleToolHealthCheck(state, sessionName); if (state.Info.ProcessingPhase < 3) { state.Info.ProcessingPhase = 3; // Working @@ -362,6 +365,8 @@ void Invoke(Action action) case ToolExecutionCompleteEvent toolDone: if (toolDone.Data == null) break; Interlocked.Decrement(ref state.ActiveToolCallCount); + // Cancel the tool health check timer since tool completed normally + CancelToolHealthCheck(state); Interlocked.Increment(ref state.Info._toolCallCount); var completeCallId = toolDone.Data.ToolCallId ?? ""; var completeToolName = toolDone.Data?.GetType().GetProperty("ToolName")?.GetValue(toolDone.Data)?.ToString(); @@ -444,6 +449,7 @@ void Invoke(Action action) case AssistantTurnStartEvent: // Cancel any pending TurnEnd→Idle fallback — another agent round is starting CancelTurnEndFallback(state); + state.FallbackCanceledByTurnStart = true; state.HasReceivedDeltasThisTurn = false; var phaseAdvancedToThinking = state.Info.ProcessingPhase < 2; if (phaseAdvancedToThinking) state.Info.ProcessingPhase = 2; // Thinking @@ -464,7 +470,14 @@ void Invoke(Action action) } // Schedule a delayed CompleteResponse in case SessionIdleEvent never arrives (SDK bug #299). // Cancelled by AssistantTurnStartEvent (another round starting) or SessionIdleEvent (normal path). + // If TurnStart previously canceled the fallback, this re-arms it — creating the + // self-healing loop: TurnEnd → TurnStart cancel → TurnEnd re-arm → fallback fires. { + if (state.FallbackCanceledByTurnStart) + { + Debug($"[TURNEND-FALLBACK] '{sessionName}' re-arming fallback timer (was canceled by TurnStart)"); + state.FallbackCanceledByTurnStart = false; + } var turnEndGen = Interlocked.Read(ref state.ProcessingGeneration); var idleFallbackCts = new CancellationTokenSource(); // Capture token BEFORE publishing so CancelTurnEndFallback on another thread @@ -651,6 +664,7 @@ await notifService.SendNotificationAsync( var errMsg = Models.ErrorMessageHelper.HumanizeMessage(err.Data?.Message ?? "Unknown error"); CancelProcessingWatchdog(state); CancelTurnEndFallback(state); + CancelToolHealthCheck(state); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); @@ -862,9 +876,12 @@ private void CompleteResponse(SessionState state, long? expectedGeneration = nul CancelProcessingWatchdog(state); // Also cancel any pending TurnEnd→Idle fallback — CompleteResponse is now executing CancelTurnEndFallback(state); + CancelToolHealthCheck(state); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; + state.FallbackCanceledByTurnStart = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); + Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0); state.Info.IsResumed = false; // Clear after first successful turn var response = state.CurrentResponse.ToString(); if (!string.IsNullOrWhiteSpace(response)) @@ -1390,6 +1407,11 @@ private void HandleReflectionAdvanceResult(SessionState state, string response, /// If no SDK events arrive for this many seconds while a tool is actively executing, the session is considered stuck. /// This is much longer because legitimate tool executions (e.g., running UI tests, long builds) can take many minutes. internal const int WatchdogToolExecutionTimeoutSeconds = 600; + /// After the first Case A reset (tool running + server alive but no events), switch to this + /// shorter timeout for subsequent checks. This accelerates dead connection detection while still + /// allowing the first 600s for legitimate long-running tools. Total max stuck time becomes + /// 600s + (WatchdogMaxToolAliveResets × 60s) ≈ 12 minutes instead of 40 minutes. + internal const int WatchdogToolEscalationTimeoutSeconds = 60; // Sessions that USED tools but have none actively running — the model may be // thinking between tool rounds, but 600s is too long for a likely-dead session. internal const int WatchdogUsedToolsIdleTimeoutSeconds = 180; @@ -1408,8 +1430,16 @@ private void HandleReflectionAdvanceResult(SessionState state, string response, /// session's transport-level connection is broken (ConnectionLostException). Without this cap, /// Case A resets LastEventAtTicks indefinitely, and ProcessingStartedAt resets on each app /// restart — so neither the inactivity nor the max-time safety net ever fires. - /// (3+1) resets × 600s effective timeout ≈ 40 minutes max of Case A resets. - internal const int WatchdogMaxToolAliveResets = 3; + /// With escalation timeout (60s after first reset), total max stuck time is: + /// 600s (first) + 2 × 60s (escalation) = 720s ≈ 12 minutes. + internal const int WatchdogMaxToolAliveResets = 2; + + /// + /// Milliseconds after a tool starts to perform the first health check. If no events have + /// arrived since tool start, we verify the connection is still alive. This detects dead + /// connections within ~30s instead of waiting for the 600s watchdog timeout. + /// + internal const int ToolHealthCheckIntervalMs = 30_000; /// /// Milliseconds to wait after AssistantTurnEndEvent before firing CompleteResponse @@ -1436,6 +1466,144 @@ private static void CancelProcessingWatchdog(SessionState state) } } + /// + /// Cancels and disposes any pending tool health check timer. + /// + private static void CancelToolHealthCheck(SessionState state) + { + var prev = Interlocked.Exchange(ref state.ToolHealthCheckTimer, null); + prev?.Dispose(); + } + + /// + /// Schedules a tool health check to run after ToolHealthCheckIntervalMs. + /// If the tool completes before the timer fires, the timer is cancelled. + /// If the timer fires and no events have arrived since tool start, we check + /// if the connection is still alive and trigger recovery if it's dead. + /// + private void ScheduleToolHealthCheck(SessionState state, string sessionName) + { + // Skip in demo/remote mode where we can't probe the local server + if (IsDemoMode || IsRemoteMode) return; + + var checkGeneration = Interlocked.Read(ref state.ProcessingGeneration); + var timer = new Timer(_ => + { + try + { + // Verify we're still on the same turn + if (Interlocked.Read(ref state.ProcessingGeneration) != checkGeneration) return; + if (!state.Info.IsProcessing) return; + + var activeTools = Volatile.Read(ref state.ActiveToolCallCount); + if (activeTools <= 0) return; // Tool completed normally + + // Check if any events arrived since tool start + var lastEventTicks = Interlocked.Read(ref state.LastEventAtTicks); + var toolStartTicks = Interlocked.Read(ref state.ToolStartedAtTicks); + var eventsSinceToolStart = lastEventTicks > toolStartTicks; + + if (eventsSinceToolStart) + { + // Events are still flowing - tool is legitimately running + // Schedule another check + Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0); + Debug($"[TOOL-HEALTH] '{sessionName}' events flowing — rescheduling health check"); + ScheduleToolHealthCheck(state, sessionName); + return; + } + + // No events since tool start. Check if server is alive. + var serverAlive = _serverManager.IsServerRunning; + if (!serverAlive) + { + Debug($"[TOOL-HEALTH] '{sessionName}' server is DEAD — triggering immediate recovery"); + TriggerToolHealthRecovery(state, sessionName, "server not responding"); + return; + } + + // Server TCP port is alive, but no events for 30s. The connection might be dead. + // Increment the stale check counter and check if we should recover. + var staleChecks = Interlocked.Increment(ref state.ToolHealthStaleChecks); + if (staleChecks > WatchdogMaxToolAliveResets) + { + Debug($"[TOOL-HEALTH] '{sessionName}' {staleChecks} stale health checks — assuming dead connection"); + TriggerToolHealthRecovery(state, sessionName, "no events after multiple health checks (connection likely dead)"); + return; + } + + Debug($"[TOOL-HEALTH] '{sessionName}' no events for {ToolHealthCheckIntervalMs/1000}s, server alive — " + + $"check {staleChecks}/{WatchdogMaxToolAliveResets}, scheduling another check"); + ScheduleToolHealthCheck(state, sessionName); + } + catch (Exception ex) + { + Debug($"[TOOL-HEALTH] '{sessionName}' check failed: {ex.Message}"); + } + }, null, Timeout.Infinite, Timeout.Infinite); // Don't start yet — store first to avoid race + + var prev = Interlocked.Exchange(ref state.ToolHealthCheckTimer, timer); + prev?.Dispose(); + timer.Change(ToolHealthCheckIntervalMs, Timeout.Infinite); // Now start + } + + /// + /// Triggers recovery when tool health check detects a dead connection. + /// Clears the stuck processing state and notifies the user. + /// + private void TriggerToolHealthRecovery(SessionState state, string sessionName, string reason) + { + CancelToolHealthCheck(state); + CancelProcessingWatchdog(state); + CancelTurnEndFallback(state); + + var activeTools = Volatile.Read(ref state.ActiveToolCallCount); + Debug($"[TOOL-HEALTH] '{sessionName}' triggering recovery: {reason} (activeTools={activeTools})"); + + InvokeOnUI(() => + { + if (!state.Info.IsProcessing) return; + + OnError?.Invoke(sessionName, $"Tool execution stuck ({reason}). Session recovered automatically."); + + // Full cleanup mirroring CompleteResponse — missing fields here caused stuck sessions + Interlocked.Exchange(ref state.ActiveToolCallCount, 0); + state.HasUsedToolsThisTurn = false; + state.FallbackCanceledByTurnStart = false; + Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); + Interlocked.Exchange(ref state.WatchdogCaseAResets, 0); + Interlocked.Exchange(ref state.ToolHealthStaleChecks, 0); + + // Build full response: flushed mid-turn text + remaining current text + var response = state.CurrentResponse.ToString(); + var fullResponse = state.FlushedResponse.Length > 0 + ? (string.IsNullOrEmpty(response) + ? state.FlushedResponse.ToString() + : state.FlushedResponse + "\n\n" + response) + : response; + + state.CurrentResponse.Clear(); + state.FlushedResponse.Clear(); + state.PendingReasoningMessages.Clear(); + + state.Info.IsProcessing = false; + state.Info.IsResumed = false; + Interlocked.Exchange(ref state.SendingFlag, 0); + state.Info.ProcessingStartedAt = null; + state.Info.ToolCallCount = 0; + state.Info.ProcessingPhase = 0; + state.Info.ClearPermissionDenials(); + + state.ResponseCompletion?.TrySetResult(fullResponse); + + Debug($"[TOOL-HEALTH-COMPLETE] '{sessionName}' recovery finished (responseLen={fullResponse.Length})"); + + var summary = fullResponse.Length > 0 ? (fullResponse.Length > 100 ? fullResponse[..100] + "..." : fullResponse) : ""; + OnSessionComplete?.Invoke(sessionName, summary); + OnStateChanged?.Invoke(); + }); + } + /// /// Cancels and disposes any pending TurnEnd→Idle fallback timer on the state. /// Mirrors the CancelProcessingWatchdog pattern: Cancel + Dispose to avoid @@ -1551,13 +1719,22 @@ private async Task RunProcessingWatchdogAsync(SessionState state, string session var useToolTimeout = hasActiveTool || (state.Info.IsResumed && !useResumeQuiescence); var useUsedToolsTimeout = !useToolTimeout && hasUsedTools && !hasActiveTool; + + // After the first Case A reset (tool running + server alive but no events arrived), + // switch to the escalation timeout. This allows the first 600s for legitimate long-running + // tools, but speeds up dead connection detection on subsequent checks. + var caseAResets = Volatile.Read(ref state.WatchdogCaseAResets); + var useEscalationTimeout = useToolTimeout && caseAResets > 0; + var effectiveTimeout = useResumeQuiescence ? WatchdogResumeQuiescenceTimeoutSeconds - : useToolTimeout - ? WatchdogToolExecutionTimeoutSeconds - : useUsedToolsTimeout - ? WatchdogUsedToolsIdleTimeoutSeconds - : WatchdogInactivityTimeoutSeconds; + : useEscalationTimeout + ? WatchdogToolEscalationTimeoutSeconds + : useToolTimeout + ? WatchdogToolExecutionTimeoutSeconds + : useUsedToolsTimeout + ? WatchdogUsedToolsIdleTimeoutSeconds + : WatchdogInactivityTimeoutSeconds; // Safety net: check absolute max processing time, but only if events have also // gone stale. If events are still flowing (elapsed < effectiveTimeout), the session @@ -1599,25 +1776,58 @@ private async Task RunProcessingWatchdogAsync(SessionState state, string session { if (hasActiveTool && !IsDemoMode && !IsRemoteMode) { - // Case A: check server TCP port + // Case A: check server TCP port + events.jsonl freshness var serverAlive = _serverManager.IsServerRunning; if (serverAlive) { - var resets = Interlocked.Increment(ref state.WatchdogCaseAResets); - if (resets > WatchdogMaxToolAliveResets) + // Check if the CLI is actively writing events for this session. + // events.jsonl is written by the CLI process, not our app. + // If recently modified → tool is actively running → wait indefinitely. + // If stale → connection is likely dead → kill immediately. + var eventsFileActive = false; + try { - // Too many consecutive resets with no real SDK events — the - // session's JSON-RPC connection is likely dead even though the - // shared persistent server is still alive. Fall through to kill. - Debug($"[WATCHDOG] '{sessionName}' Case A reset cap exceeded ({resets}/{WatchdogMaxToolAliveResets}) " + - $"— killing despite server alive (elapsed={elapsed:F0}s, totalProcessing={totalProcessingSeconds:F0}s)"); + var sessionId = state.Info.SessionId; + if (!string.IsNullOrEmpty(sessionId)) + { + var eventsPath = Path.Combine(SessionStatePath, sessionId, "events.jsonl"); + if (File.Exists(eventsPath)) + { + var lastWrite = File.GetLastWriteTimeUtc(eventsPath); + var fileAge = (DateTime.UtcNow - lastWrite).TotalSeconds; + eventsFileActive = fileAge < 60; // modified within last 60s + } + } } - else + catch { /* filesystem errors → fall through to reset-cap logic */ } + + if (eventsFileActive) { - Debug($"[WATCHDOG] '{sessionName}' {elapsed:F0}s inactivity but tool is running and server is alive — resetting timer " + - $"(reset #{resets}/{WatchdogMaxToolAliveResets}, timeout={effectiveTimeout}s, totalProcessing={totalProcessingSeconds:F0}s)"); + // Events file is fresh — tool is actively running. Wait indefinitely. + Debug($"[WATCHDOG] '{sessionName}' tool is running and events.jsonl is fresh — waiting indefinitely " + + $"(elapsed={elapsed:F0}s, totalProcessing={totalProcessingSeconds:F0}s)"); Interlocked.Exchange(ref state.LastEventAtTicks, DateTime.UtcNow.Ticks); - continue; // keep waiting — don't kill + Interlocked.Exchange(ref state.WatchdogCaseAResets, 0); // reset counter since tool is active + continue; + } + else + { + // Events file is stale or missing — connection is likely dead. + // Use the reset-cap as a safety buffer (1 more cycle to confirm). + var resets = Interlocked.Increment(ref state.WatchdogCaseAResets); + if (resets > 1) // Only need 1 confirmation cycle since we have file evidence + { + Debug($"[WATCHDOG] '{sessionName}' events.jsonl stale and reset count {resets} > 1 " + + $"— killing despite server alive (elapsed={elapsed:F0}s, totalProcessing={totalProcessingSeconds:F0}s)"); + // fall through to kill + } + else + { + Debug($"[WATCHDOG] '{sessionName}' events.jsonl stale but giving 1 more cycle " + + $"(reset #{resets}, elapsed={elapsed:F0}s, totalProcessing={totalProcessingSeconds:F0}s)"); + Interlocked.Exchange(ref state.LastEventAtTicks, DateTime.UtcNow.Ticks); + continue; + } } } else @@ -1673,6 +1883,7 @@ private async Task RunProcessingWatchdogAsync(SessionState state, string session return; } CancelProcessingWatchdog(state); + CancelToolHealthCheck(state); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); state.HasUsedToolsThisTurn = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); @@ -1810,6 +2021,7 @@ private async Task TryRecoverPermissionAsync(SessionState state, string sessionN // Cancel old watchdog AND TurnEnd fallback BEFORE creating new state CancelProcessingWatchdog(state); CancelTurnEndFallback(state); + CancelToolHealthCheck(state); // Bug B fix: Cancel the old ResponseCompletion TCS so the original // SendPromptAsync awaiter doesn't hang forever. diff --git a/PolyPilot/Services/CopilotService.Organization.cs b/PolyPilot/Services/CopilotService.Organization.cs index 80daafb69e..86f313047e 100644 --- a/PolyPilot/Services/CopilotService.Organization.cs +++ b/PolyPilot/Services/CopilotService.Organization.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using System.Diagnostics; using System.Text.Json; using System.Text.RegularExpressions; using PolyPilot.Models; @@ -29,8 +30,11 @@ public partial class CopilotService { public event Action? OnOrchestratorPhaseChanged; // groupId, phase, detail - /// Maximum time a single worker is allowed to run before being cancelled. - private static readonly TimeSpan WorkerExecutionTimeout = TimeSpan.FromMinutes(10); + /// Maximum time a single worker is allowed to run before being cancelled. + /// Set high (60 min) because the smart watchdog (events.jsonl freshness) handles dead + /// session detection in ~90s. This is only an absolute backstop. + private static readonly TimeSpan WorkerExecutionTimeout = TimeSpan.FromMinutes(60); + private static readonly TimeSpan WorkerExecutionTimeoutRemote = TimeSpan.FromMinutes(10); // Per-session semaphores to prevent concurrent model switches during rapid dispatch private readonly ConcurrentDictionary _modelSwitchLocks = new(); @@ -1151,8 +1155,14 @@ private async Task SendViaOrchestratorAsync(string groupId, List members { InvokeOnUI(() => OnOrchestratorPhaseChanged?.Invoke(groupId, OrchestratorPhase.WaitingForWorkers, null)); - var workerTasks = assignments.Select(a => - ExecuteWorkerAsync(a.WorkerName, a.Task, prompt, cancellationToken)); + Debug($"[DISPATCH] Staggering {assignments.Count} workers with 1s delay"); + var workerTasks = new List>(); + foreach (var a in assignments) + { + workerTasks.Add(ExecuteWorkerAsync(a.WorkerName, a.Task, prompt, cancellationToken)); + if (workerTasks.Count < assignments.Count) + await Task.Delay(1000, cancellationToken); + } var results = await Task.WhenAll(workerTasks); // Phase 4: Synthesize — send worker results back to orchestrator @@ -1360,43 +1370,54 @@ private async Task ExecuteWorkerAsync(string workerName, string ta var workerPrompt = $"{identity}{worktreeNote}\n\nYour response will be collected and synthesized with other workers' responses.\n\n{sharedPrefix}## Original User Request (context)\n{originalPrompt}\n\n## Your Assigned Task\n{task}"; - try + const int maxRetries = 2; + for (int attempt = 1; attempt <= maxRetries; attempt++) { - Debug($"[DISPATCH] Worker '{workerName}' starting (prompt len={workerPrompt.Length})"); - var response = await SendPromptAndWaitAsync(workerName, workerPrompt, cancellationToken, originalPrompt: originalPrompt); - - // Worker revival: empty response means the session died (e.g., dead SSE stream - // after reconnect). Create a fresh session and retry once. - if (string.IsNullOrWhiteSpace(response)) + try { - Debug($"[DISPATCH] Worker '{workerName}' returned empty — attempting fresh session revival"); - if (_sessions.TryGetValue(workerName, out var deadState)) - { - try { await deadState.Session.DisposeAsync(); } catch { } + Debug($"[DISPATCH] Worker '{workerName}' starting (prompt len={workerPrompt.Length}, attempt={attempt})"); + var response = await SendPromptAndWaitAsync(workerName, workerPrompt, cancellationToken, originalPrompt: originalPrompt); - var workerMeta = GetSessionMeta(workerName); - var client = GetClientForGroup(workerMeta?.GroupId); - if (client != null) + // Worker revival: empty response means the session died (e.g., dead SSE stream + // after reconnect). Create a fresh session and retry once. + if (string.IsNullOrWhiteSpace(response)) + { + Debug($"[DISPATCH] Worker '{workerName}' returned empty — attempting fresh session revival"); + if (_sessions.TryGetValue(workerName, out var deadState)) { - var freshConfig = BuildFreshSessionConfig(deadState); - var freshSession = await client.CreateSessionAsync(freshConfig, cancellationToken); - deadState.Info.SessionId = freshSession.SessionId; - var freshState = new SessionState { Session = freshSession, Info = deadState.Info }; - _sessions[workerName] = freshState; - Debug($"[DISPATCH] Worker '{workerName}' revived with fresh session '{freshSession.SessionId}'"); - response = await SendPromptAndWaitAsync(workerName, workerPrompt, cancellationToken, originalPrompt: originalPrompt); + try { await deadState.Session.DisposeAsync(); } catch { } + + var workerMeta = GetSessionMeta(workerName); + var client = GetClientForGroup(workerMeta?.GroupId); + if (client != null) + { + var freshConfig = BuildFreshSessionConfig(deadState); + var freshSession = await client.CreateSessionAsync(freshConfig, cancellationToken); + deadState.Info.SessionId = freshSession.SessionId; + var freshState = new SessionState { Session = freshSession, Info = deadState.Info }; + _sessions[workerName] = freshState; + Debug($"[DISPATCH] Worker '{workerName}' revived with fresh session '{freshSession.SessionId}'"); + response = await SendPromptAndWaitAsync(workerName, workerPrompt, cancellationToken, originalPrompt: originalPrompt); + } } } - } - Debug($"[DISPATCH] Worker '{workerName}' completed (response len={response?.Length ?? 0}, elapsed={sw.Elapsed.TotalSeconds:F1}s)"); - return new WorkerResult(workerName, response, true, null, sw.Elapsed); - } - catch (Exception ex) - { - Debug($"[DISPATCH] Worker '{workerName}' FAILED: {ex.GetType().Name}: {ex.Message} (elapsed={sw.Elapsed.TotalSeconds:F1}s)"); - return new WorkerResult(workerName, null, false, ex.Message, sw.Elapsed); + Debug($"[DISPATCH] Worker '{workerName}' completed (response len={response?.Length ?? 0}, elapsed={sw.Elapsed.TotalSeconds:F1}s)"); + return new WorkerResult(workerName, response, true, null, sw.Elapsed); + } + catch (Exception ex) when (attempt < maxRetries && IsConnectionError(ex)) + { + Debug($"[DISPATCH] Worker '{workerName}' attempt {attempt} failed with {ex.GetType().Name} — retrying in 2s"); + await Task.Delay(2000, cancellationToken); + continue; + } + catch (Exception ex) + { + Debug($"[DISPATCH] Worker '{workerName}' FAILED: {ex.GetType().Name}: {ex.Message} (elapsed={sw.Elapsed.TotalSeconds:F1}s)"); + return new WorkerResult(workerName, null, false, ex.Message, sw.Elapsed); + } } + throw new UnreachableException(); // for loop always returns or continues } private async Task SendPromptAndWaitAsync(string sessionName, string prompt, CancellationToken cancellationToken, string? originalPrompt = null) @@ -1405,7 +1426,7 @@ private async Task SendPromptAndWaitAsync(string sessionName, string pro // Do NOT capture state and await its TCS separately: reconnection replaces the state // object, orphaning the old TCS and causing a 10-minute hang. using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - cts.CancelAfter(WorkerExecutionTimeout); + cts.CancelAfter(IsDemoMode || IsRemoteMode ? WorkerExecutionTimeoutRemote : WorkerExecutionTimeout); // Wire CTS to ResponseCompletion TCS so the 10-minute timeout actually cancels the await. // Must look up from _sessions dict (not captured ref) since reconnect replaces state. await using var ctsReg = cts.Token.Register(() => @@ -1641,6 +1662,69 @@ private async Task MonitorAndSynthesizeAsync(PendingOrchestration pending, Cance InvokeOnUI(() => OnOrchestratorPhaseChanged?.Invoke(pending.GroupId, OrchestratorPhase.Dispatching, $"Re-dispatching {unstartedWorkers.Count} worker(s)")); + // BUG FIX: Clear stuck IsProcessing state on workers before re-dispatch. + // After an app restart, workers may have IsProcessing=true from a previous + // incomplete turn (e.g., tool.execution_start with no tool.execution_complete). + // This prevents SendPromptAsync from accepting new prompts. Clear it here + // so re-dispatch can actually send the prompt. + foreach (var workerName in unstartedWorkers) + { + if (_sessions.TryGetValue(workerName, out var workerState) && workerState.Info.IsProcessing) + { + Debug($"[DISPATCH] Clearing stuck IsProcessing on '{workerName}' before re-dispatch"); + // Cancel timers first (thread-safe — use Interlocked internally) + CancelProcessingWatchdog(workerState); + CancelTurnEndFallback(workerState); + CancelToolHealthCheck(workerState); + + // Must run on UI thread per INV-2; use TCS to synchronize + var tcs = new TaskCompletionSource(); + InvokeOnUI(() => + { + try + { + if (!workerState.Info.IsProcessing) { tcs.TrySetResult(true); return; } + + // Full cleanup mirroring CompleteResponse / tool-health recovery + Interlocked.Exchange(ref workerState.ActiveToolCallCount, 0); + Interlocked.Exchange(ref workerState.SendingFlag, 0); + Interlocked.Exchange(ref workerState.SuccessfulToolCountThisTurn, 0); + Interlocked.Exchange(ref workerState.WatchdogCaseAResets, 0); + workerState.HasUsedToolsThisTurn = false; + workerState.FallbackCanceledByTurnStart = false; + workerState.Info.IsResumed = false; + workerState.Info.ProcessingStartedAt = null; + workerState.Info.ToolCallCount = 0; + workerState.Info.ProcessingPhase = 0; + workerState.Info.ClearPermissionDenials(); + + var response = workerState.CurrentResponse.ToString(); + var fullResponse = workerState.FlushedResponse.Length > 0 + ? (string.IsNullOrEmpty(response) + ? workerState.FlushedResponse.ToString() + : workerState.FlushedResponse + "\n\n" + response) + : response; + + workerState.CurrentResponse.Clear(); + workerState.FlushedResponse.Clear(); + workerState.PendingReasoningMessages.Clear(); + workerState.Info.IsProcessing = false; + + workerState.ResponseCompletion?.TrySetResult(fullResponse); + var summary = fullResponse.Length > 0 ? (fullResponse.Length > 100 ? fullResponse[..100] + "..." : fullResponse) : ""; + OnSessionComplete?.Invoke(workerName, summary); + OnStateChanged?.Invoke(); + tcs.TrySetResult(true); + } + catch (Exception ex) + { + tcs.TrySetException(ex); + } + }); + await tcs.Task; + } + } + // Build a generic task from the original prompt for re-dispatched workers. // Materialize as an array so we can inspect individual task results even on partial failure. var redispatchTaskArray = unstartedWorkers diff --git a/PolyPilot/Services/CopilotService.cs b/PolyPilot/Services/CopilotService.cs index dde17728ca..9bdb631d78 100644 --- a/PolyPilot/Services/CopilotService.cs +++ b/PolyPilot/Services/CopilotService.cs @@ -445,6 +445,18 @@ private class SessionState /// WatchdogMaxToolAliveResets to prevent infinite resets when the session's JSON-RPC /// connection is dead but the shared persistent server is still alive. public int WatchdogCaseAResets; + /// True if the TurnEnd→Idle fallback was canceled by an AssistantTurnStartEvent. + /// Used for diagnostic logging: when the next TurnEnd re-arms the fallback, the log shows + /// the self-healing loop in action (TurnEnd → TurnStart cancel → TurnEnd re-arm). + public volatile bool FallbackCanceledByTurnStart; + /// Timer that fires shortly after a tool starts to verify the connection is still alive. + /// If no tool completion event arrives within ToolHealthCheckIntervalMs, we do an active health + /// check to detect dead connections early (instead of waiting for the 600s watchdog timeout). + public Timer? ToolHealthCheckTimer; + public int ToolHealthStaleChecks; // Separate from WatchdogCaseAResets — health check's own stale counter + /// Timestamp when the most recent tool started. Used by the tool health check to + /// determine if a tool has been running too long without any events. + public long ToolStartedAtTicks; } private void Debug(string message) @@ -834,6 +846,7 @@ public async Task ReconnectAsync(ConnectionSettings settings, CancellationToken { CancelProcessingWatchdog(state); CancelTurnEndFallback(state); + CancelToolHealthCheck(state); try { if (state.Session != null) await state.Session.DisposeAsync(); } catch { } } _sessions.Clear(); @@ -2413,6 +2426,7 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis state.Info.ClearPermissionDenials(); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); // Reset stale tool count from previous turn state.HasUsedToolsThisTurn = false; // Reset stale tool flag from previous turn + state.FallbackCanceledByTurnStart = false; Interlocked.Exchange(ref state.SuccessfulToolCountThisTurn, 0); // Cancel any pending TurnEnd→Idle fallback from the previous turn CancelTurnEndFallback(state); @@ -2624,6 +2638,7 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis // Cancel old watchdog AND TurnEnd fallback BEFORE creating new state — they share Info/TCS CancelProcessingWatchdog(state); CancelTurnEndFallback(state); + CancelToolHealthCheck(state); Debug($"[RECONNECT] '{sessionName}' replacing state (old handler will be orphaned, " + $"old session disposed, new session={newSession.SessionId})"); var newState = new SessionState @@ -2636,6 +2651,12 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis // orphaned old state can't pass generation checks on the new state. Interlocked.Exchange(ref newState.ProcessingGeneration, Interlocked.Read(ref state.ProcessingGeneration)); + // Reset tool tracking for the NEW connection. The old connection's + // tool state is stale — no tools have run on this connection yet. + // Without this, HasUsedToolsThisTurn=true from the dead connection + // inflates the watchdog timeout from 120s to 600s, making stuck + // sessions wait 5x longer than necessary to recover. + newState.HasUsedToolsThisTurn = false; Interlocked.Exchange(ref newState.ActiveToolCallCount, 0); Interlocked.Exchange(ref newState.SuccessfulToolCountThisTurn, 0); newState.IsMultiAgentSession = state.IsMultiAgentSession; @@ -2682,6 +2703,7 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis OnError?.Invoke(sessionName, $"Session disconnected and reconnect failed: {Models.ErrorMessageHelper.Humanize(retryEx)}"); CancelProcessingWatchdog(state); CancelTurnEndFallback(state); + CancelToolHealthCheck(state); FlushCurrentResponse(state); Debug($"[ERROR] '{sessionName}' reconnect+retry failed, clearing IsProcessing"); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); @@ -2703,6 +2725,7 @@ public async Task SendPromptAsync(string sessionName, string prompt, Lis OnError?.Invoke(sessionName, $"SendAsync failed: {Models.ErrorMessageHelper.Humanize(ex)}"); CancelProcessingWatchdog(state); CancelTurnEndFallback(state); + CancelToolHealthCheck(state); FlushCurrentResponse(state); Debug($"[ERROR] '{sessionName}' SendAsync failed, clearing IsProcessing (error={ex.Message})"); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); @@ -2866,6 +2889,7 @@ public async Task AbortSessionAsync(string sessionName, bool markAsInterrupted = // Cancel any pending TurnEnd→Idle fallback so it doesn't fire CompleteResponse after abort CancelTurnEndFallback(state); CancelProcessingWatchdog(state); + CancelToolHealthCheck(state); state.FlushedResponse.Clear(); state.PendingReasoningMessages.Clear(); state.Info.ClearPermissionDenials(); // INV-1: clear on all termination paths @@ -2955,6 +2979,7 @@ await InvokeOnUIAsync(() => OnError?.Invoke(sessionName, $"Soft steer failed: {Models.ErrorMessageHelper.Humanize(ex)}"); CancelProcessingWatchdog(state); CancelTurnEndFallback(state); + CancelToolHealthCheck(state); FlushCurrentResponse(state); Debug($"[STEER-ERROR] '{sessionName}' soft steer SendAsync failed, clearing IsProcessing (error={ex.Message})"); Interlocked.Exchange(ref state.ActiveToolCallCount, 0); @@ -3408,6 +3433,7 @@ internal async Task CloseSessionCoreAsync(string name, bool notifyUi) // Cancel any pending timers so they don't fire on torn-down state after session removal CancelProcessingWatchdog(state); CancelTurnEndFallback(state); + CancelToolHealthCheck(state); // Dispose the SDK session AFTER UI has updated — DisposeAsync talks to the CLI // process and may trigger additional SDK events on background threads. Running it @@ -3458,6 +3484,7 @@ public async ValueTask DisposeAsync() { CancelProcessingWatchdog(state); CancelTurnEndFallback(state); + CancelToolHealthCheck(state); if (state.Session is not null) try { await state.Session.DisposeAsync(); } catch { } }