diff --git a/.gitignore b/.gitignore index 8a6e0c6..05f21ec 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ coverage.out .claude/settings.local.json .private-journal/ a/ +runs/ diff --git a/CHANGELOG.md b/CHANGELOG.md index ecbe011..22d1c38 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,33 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- **Library API hardening for v1.0** (#102, #103, #104, #106, #109): + - Typed enum-like strings for `CheckStatus` and `SuggestionKind` so consumers can switch-exhaust. Existing constants (`SuggestionRetryPattern`, etc.) retain their underlying string values. + - `tracker.WithVersionInfo(version, commit)` functional option replaces the CLI-only `DoctorConfig.TrackerVersion` / `TrackerCommit` fields. + - `DiagnoseConfig.LogWriter` / `AuditConfig.LogWriter` — optional `io.Writer` for non-fatal parse warnings. Nil is treated as `io.Discard` so library callers no longer see stray warnings on `os.Stderr`. The `tracker` CLI sets this to `io.Discard` for user-facing commands. `Doctor` has no warnings to suppress so it deliberately does not carry a `LogWriter` field. + - `Doctor`, `Diagnose`, `DiagnoseMostRecent`, `Audit`, `Simulate` now accept `context.Context`, honored by provider probes and binary version lookups. `getBinaryVersion` now uses `exec.CommandContext` with a 5-second timeout, matching `getDippinVersion`. + - Provider probe error bodies are now sanitized (API keys and bearer tokens stripped) before they land in `CheckDetail.Message`. + - `NDJSON` handler closures (pipeline, agent, LLM trace) now `recover()` from panics in the underlying writer so a misbehaving sink cannot crash the caller goroutine. Panic suppression is per-`NDJSONWriter` instance (not package-level), so one misbehaving sink cannot silence unrelated writers in the same process. + - `Diagnose` now streams `activity.jsonl` with `bufio.Scanner` instead of `os.ReadFile` → `strings.Split`, matching `LoadActivityLog` and avoiding a memory spike on large runs. Scanner errors (1 MB line-length overflow, I/O) and `ctx.Err()` now propagate out of `Diagnose` as a real error — partial reports are never returned as success, so automation with deadlines can distinguish complete from truncated analysis. + +### Changed + +- **BREAKING** (library): + - `tracker.Doctor(cfg)` → `tracker.Doctor(ctx, cfg, opts...)`. + - `tracker.Diagnose(runDir)` → `tracker.Diagnose(ctx, runDir, opts...)`. + - `tracker.DiagnoseMostRecent(workdir)` → `tracker.DiagnoseMostRecent(ctx, workdir, opts...)`. + - `tracker.Audit(runDir)` → `tracker.Audit(ctx, runDir)`. (No config struct — Audit emits no suppressible warnings. Use `ListRuns` + `AuditConfig{LogWriter}` for bulk enumeration.) + - `tracker.Simulate(source)` → `tracker.Simulate(ctx, source)`. + - `tracker.ListRuns(workdir)` now accepts optional `...AuditConfig`. + - `tracker.NDJSONEvent` → `tracker.StreamEvent`. Wire-format JSON tags unchanged. + - `NDJSONWriter.Write` now returns `error` so callers can detect a broken stream. First failure is still logged to `os.Stderr` once (unchanged behavior); subsequent failures are surfaced via the return value. + - `DoctorConfig.TrackerVersion` and `DoctorConfig.TrackerCommit` removed — use `tracker.WithVersionInfo(version, commit)` instead. + - `CheckResult.Status` and `CheckDetail.Status` are now typed as `tracker.CheckStatus` (underlying string). Untyped string literal comparisons (`status == "ok"`) keep working. + - `Suggestion.Kind` is now typed as `tracker.SuggestionKind` (underlying string). +- `tracker diagnose` suggestion order is now deterministic (alphabetical by node ID). Previously suggestions printed in Go map-iteration order, which varied between runs. + ### Fixed - **OpenAI Responses API: `function_call_output` and `function_call` items now always serialize required fields** (closes #114). Previously the shared `openaiInput` struct used `omitempty` on every field, so a tool returning an empty-string result produced `{"type":"function_call_output","call_id":"..."}` with no `output` field, and a no-argument tool call produced `function_call` with no `arguments`. OpenAI's endpoint tolerated this, but OpenRouter's strict Zod validator rejected the requests with `invalid_prompt` / `invalid_union` errors, symptomatic on GLM, Qwen, and Kimi via OpenRouter. Fixed by replacing the `omitempty`-tagged single struct with a `MarshalJSON` method that emits only fields valid per item type, with required fields always present. Reported by @Nopik. diff --git a/README.md b/README.md index 7f208c2..946ad26 100644 --- a/README.md +++ b/README.md @@ -574,9 +574,14 @@ result, _ := tracker.Run(ctx, source, tracker.Config{ ### Analyzing past runs from code ```go -import tracker "github.com/2389-research/tracker" +import ( + "context" -report, err := tracker.DiagnoseMostRecent(".") + tracker "github.com/2389-research/tracker" +) + +ctx := context.Background() +report, err := tracker.DiagnoseMostRecent(ctx, ".") if err != nil { log.Fatal(err) } for _, f := range report.Failures { @@ -588,7 +593,7 @@ for _, s := range report.Suggestions { } ``` -`tracker.Audit`, `tracker.Simulate`, and `tracker.Doctor` follow the same pattern and return JSON-serializable reports. +`tracker.Audit`, `tracker.DiagnoseMostRecent`, `tracker.Simulate`, and `tracker.Doctor` all accept `context.Context` as their first argument and return JSON-serializable reports. `tracker.ListRuns` and `DiagnoseMostRecent`/`Diagnose` accept an optional config (`AuditConfig`, `DiagnoseConfig`) with a `LogWriter` for non-fatal parse warnings — set it to `io.Discard` to silence warnings in embedded callers. `Audit` and `Simulate` currently take just `ctx` (plus their payload); `Doctor` takes a required `DoctorConfig` plus optional functional options (e.g., `tracker.WithVersionInfo`). To stream events programmatically in the same NDJSON format as `tracker --json`, use `tracker.NewNDJSONWriter`: diff --git a/cmd/tracker/audit.go b/cmd/tracker/audit.go index e4e56ad..562ff85 100644 --- a/cmd/tracker/audit.go +++ b/cmd/tracker/audit.go @@ -3,7 +3,9 @@ package main import ( + "context" "fmt" + "io" "time" tracker "github.com/2389-research/tracker" @@ -11,7 +13,7 @@ import ( // listRuns shows all available runs with their status and node count. func listRuns(workdir string) error { - runs, err := tracker.ListRuns(workdir) + runs, err := tracker.ListRuns(workdir, tracker.AuditConfig{LogWriter: io.Discard}) if err != nil { return err } @@ -55,7 +57,7 @@ func runAudit(workdir, runID string) error { if err != nil { return err } - report, err := tracker.Audit(runDir) + report, err := tracker.Audit(context.Background(), runDir) if err != nil { return err } diff --git a/cmd/tracker/diagnose.go b/cmd/tracker/diagnose.go index 292b7e6..e228508 100644 --- a/cmd/tracker/diagnose.go +++ b/cmd/tracker/diagnose.go @@ -3,7 +3,9 @@ package main import ( + "context" "fmt" + "io" "os" "sort" "strings" @@ -14,7 +16,7 @@ import ( // diagnoseMostRecent finds and diagnoses the most recent run. func diagnoseMostRecent(workdir string) error { - report, err := tracker.DiagnoseMostRecent(workdir) + report, err := tracker.DiagnoseMostRecent(context.Background(), workdir, tracker.DiagnoseConfig{LogWriter: io.Discard}) if err != nil { return err } @@ -28,7 +30,7 @@ func runDiagnose(workdir, runID string) error { if err != nil { return err } - report, err := tracker.Diagnose(runDir) + report, err := tracker.Diagnose(context.Background(), runDir, tracker.DiagnoseConfig{LogWriter: io.Discard}) if err != nil { return err } diff --git a/cmd/tracker/doctor.go b/cmd/tracker/doctor.go index 06fb520..583917b 100644 --- a/cmd/tracker/doctor.go +++ b/cmd/tracker/doctor.go @@ -4,6 +4,7 @@ package main import ( + "context" "fmt" "os" "path/filepath" @@ -73,14 +74,12 @@ func runDoctorWithConfig(workdir string, cfg DoctorConfig) error { workdir = wd } - report, err := tracker.Doctor(tracker.DoctorConfig{ + report, err := tracker.Doctor(context.Background(), tracker.DoctorConfig{ WorkDir: workdir, Backend: cfg.backend, ProbeProviders: cfg.probe, PipelineFile: cfg.pipelineFile, - TrackerVersion: version, - TrackerCommit: commit, - }) + }, tracker.WithVersionInfo(version, commit)) if err != nil { return err } diff --git a/cmd/tracker/simulate.go b/cmd/tracker/simulate.go index a1f1d83..084b644 100644 --- a/cmd/tracker/simulate.go +++ b/cmd/tracker/simulate.go @@ -3,6 +3,7 @@ package main import ( + "context" "fmt" "io" "os" @@ -58,7 +59,7 @@ func runSimulateCmd(pipelineFile, formatOverride string, w io.Writer) error { source = string(data) } - report, err := tracker.Simulate(source) + report, err := tracker.Simulate(context.Background(), source) if err != nil { return err } diff --git a/tracker_activity.go b/tracker_activity.go index 10733e1..5d6788e 100644 --- a/tracker_activity.go +++ b/tracker_activity.go @@ -159,7 +159,7 @@ func ParseActivityLine(line string) (ActivityEntry, bool) { if err := json.Unmarshal([]byte(line), &raw); err != nil { return ActivityEntry{}, false } - ts, ok := parseActivityTimestampLib(raw.Timestamp) + ts, ok := parseActivityTimestamp(raw.Timestamp) if !ok { return ActivityEntry{}, false } @@ -173,7 +173,7 @@ func ParseActivityLine(line string) (ActivityEntry, bool) { }, true } -func parseActivityTimestampLib(s string) (time.Time, bool) { +func parseActivityTimestamp(s string) (time.Time, bool) { if ts, err := time.Parse(time.RFC3339Nano, s); err == nil { return ts, true } diff --git a/tracker_audit.go b/tracker_audit.go index 5e14769..ce210c9 100644 --- a/tracker_audit.go +++ b/tracker_audit.go @@ -3,7 +3,9 @@ package tracker import ( + "context" "fmt" + "io" "os" "path/filepath" "sort" @@ -12,6 +14,15 @@ import ( "github.com/2389-research/tracker/pipeline" ) +// AuditConfig configures an Audit() or ListRuns() call. +type AuditConfig struct { + // LogWriter receives non-fatal warnings (unreadable activity.jsonl + // in a run directory, etc.). Nil is treated as io.Discard so + // embedded library callers do not see warnings on os.Stderr. The + // tracker CLI sets this to io.Discard for user-facing commands. + LogWriter io.Writer +} + // AuditReport is the structured result of Audit(). type AuditReport struct { RunID string `json:"run_id"` @@ -68,7 +79,24 @@ type RunSummary struct { // and activity.jsonl directly under it. For user-supplied input, resolve // the path via ResolveRunDir or use MostRecentRunID first, which enforce // the .tracker/runs/ layout. -func Audit(runDir string) (*AuditReport, error) { +// +// ctx is checked at entry so a caller that passes an already-cancelled +// context gets an immediate error instead of silent work. Full +// cancellation mid-parse would require threading ctx through +// pipeline.LoadCheckpoint and LoadActivityLog, which is out of scope +// today (both are fast and bounded). Nil is coalesced to +// context.Background(). +// +// Audit does not accept AuditConfig — it emits no warnings to suppress. +// Use ListRuns + AuditConfig{LogWriter} for bulk enumeration where the +// summary builder may skip unreadable activity logs. +func Audit(ctx context.Context, runDir string) (*AuditReport, error) { + if ctx == nil { + ctx = context.Background() + } + if err := ctx.Err(); err != nil { + return nil, err + } cp, err := pipeline.LoadCheckpoint(filepath.Join(runDir, "checkpoint.json")) if err != nil { return nil, fmt.Errorf("load checkpoint: %w", err) @@ -98,7 +126,9 @@ func Audit(runDir string) (*AuditReport, error) { } // ListRuns returns all runs under workdir/.tracker/runs, sorted newest first. -func ListRuns(workdir string) ([]RunSummary, error) { +func ListRuns(workdir string, opts ...AuditConfig) ([]RunSummary, error) { + cfg := firstAuditConfig(opts) + logW := logWriterOrDiscard(cfg.LogWriter) runsDir := filepath.Join(workdir, ".tracker", "runs") entries, err := os.ReadDir(runsDir) if err != nil { @@ -112,7 +142,7 @@ func ListRuns(workdir string) ([]RunSummary, error) { if !e.IsDir() { continue } - rs, ok := buildLibRunSummary(runsDir, e.Name()) + rs, ok := buildRunSummary(runsDir, e.Name(), logW) if ok { runs = append(runs, rs) } @@ -121,6 +151,13 @@ func ListRuns(workdir string) ([]RunSummary, error) { return runs, nil } +func firstAuditConfig(opts []AuditConfig) AuditConfig { + if len(opts) == 0 { + return AuditConfig{} + } + return opts[0] +} + func classifyStatus(cp *pipeline.Checkpoint, activity []ActivityEntry) string { for i := len(activity) - 1; i >= 0; i-- { switch activity[i].Type { @@ -213,7 +250,7 @@ func buildAuditRecommendations(cp *pipeline.Checkpoint, status string, total tim return recs } -func buildLibRunSummary(runsDir, name string) (RunSummary, bool) { +func buildRunSummary(runsDir, name string, logW io.Writer) (RunSummary, bool) { runDir := filepath.Join(runsDir, name) cp, err := pipeline.LoadCheckpoint(filepath.Join(runDir, "checkpoint.json")) if err != nil { @@ -221,7 +258,7 @@ func buildLibRunSummary(runsDir, name string) (RunSummary, bool) { } activity, lerr := LoadActivityLog(runDir) if lerr != nil { - fmt.Fprintf(os.Stderr, "warning: run %s: cannot read activity log: %v\n", name, lerr) + fmt.Fprintf(logW, "warning: run %s: cannot read activity log: %v\n", name, lerr) activity = nil // continue with nil so the summary still builds } SortActivityByTime(activity) diff --git a/tracker_audit_test.go b/tracker_audit_test.go index 5ba0f88..de768dd 100644 --- a/tracker_audit_test.go +++ b/tracker_audit_test.go @@ -1,13 +1,15 @@ package tracker import ( + "bytes" + "context" "os" "path/filepath" "testing" ) func TestAudit_CompletedRun(t *testing.T) { - r, err := Audit("testdata/runs/ok") + r, err := Audit(context.Background(), "testdata/runs/ok") if err != nil { t.Fatalf("Audit: %v", err) } @@ -23,7 +25,7 @@ func TestAudit_CompletedRun(t *testing.T) { } func TestAudit_FailedRun(t *testing.T) { - r, err := Audit("testdata/runs/failed") + r, err := Audit(context.Background(), "testdata/runs/failed") if err != nil { t.Fatalf("Audit: %v", err) } @@ -65,3 +67,40 @@ func TestListRuns_MultipleRuns(t *testing.T) { t.Errorf("first = %q, want r2 (newest first)", runs[0].RunID) } } + +func TestListRuns_LogWriterSilencesWarnings(t *testing.T) { + // Build a run directory whose checkpoint loads fine but whose activity.jsonl + // is unreadable (EISDIR). buildRunSummary should emit a warning to the + // LogWriter rather than os.Stderr. + workdir := t.TempDir() + runsDir := filepath.Join(workdir, ".tracker", "runs") + must(t, os.MkdirAll(filepath.Join(runsDir, "r1"), 0o755)) + must(t, os.WriteFile(filepath.Join(runsDir, "r1", "checkpoint.json"), + []byte(`{"run_id":"r1","completed_nodes":["A"],"timestamp":"2026-04-17T10:00:00Z"}`), 0o644)) + // Make activity.jsonl a directory so os.ReadFile fails with EISDIR. + must(t, os.MkdirAll(filepath.Join(runsDir, "r1", "activity.jsonl"), 0o755)) + + var logBuf bytes.Buffer + runs, err := ListRuns(workdir, AuditConfig{LogWriter: &logBuf}) + if err != nil { + t.Fatalf("ListRuns: %v", err) + } + if len(runs) != 1 { + t.Fatalf("got %d runs, want 1", len(runs)) + } + if logBuf.Len() == 0 { + t.Error("expected log writer to receive a warning about activity.jsonl") + } +} + +// TestAudit_CtxCancelledAtEntry verifies Audit returns the caller's +// cancellation error immediately rather than silently proceeding with the +// expensive checkpoint + activity log reads. +func TestAudit_CtxCancelledAtEntry(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + _, err := Audit(ctx, "testdata/runs/ok") + if err != context.Canceled { + t.Errorf("err = %v, want context.Canceled", err) + } +} diff --git a/tracker_diagnose.go b/tracker_diagnose.go index c1fd421..c3e6eda 100644 --- a/tracker_diagnose.go +++ b/tracker_diagnose.go @@ -3,8 +3,11 @@ package tracker import ( + "bufio" + "context" "encoding/json" "fmt" + "io" "os" "path/filepath" "sort" @@ -14,6 +17,18 @@ import ( "github.com/2389-research/tracker/pipeline" ) +// DiagnoseConfig configures a Diagnose() run. +type DiagnoseConfig struct { + // LogWriter receives non-fatal parse/read warnings — specifically + // malformed status.json content (one warning per bad file) and + // bufio.Scanner errors while reading activity.jsonl (e.g. lines + // exceeding the 1 MB buffer limit, I/O failures). Nil is treated + // as io.Discard so library callers do not see stray warnings on + // os.Stderr. The tracker CLI sets this to io.Discard for user- + // facing commands. + LogWriter io.Writer +} + // DiagnoseReport is the structured output of Diagnose / DiagnoseMostRecent. type DiagnoseReport struct { RunID string `json:"run_id"` @@ -50,22 +65,27 @@ type BudgetHalt struct { Message string `json:"message"` } +// SuggestionKind is the typed string identifying which template produced a +// Suggestion. The underlying string values are stable; new kinds may be +// added additively. +type SuggestionKind string + // Suggestion is an actionable recommendation produced by Diagnose. type Suggestion struct { - NodeID string `json:"node_id,omitempty"` - Kind string `json:"kind"` - Message string `json:"message"` + NodeID string `json:"node_id,omitempty"` + Kind SuggestionKind `json:"kind"` + Message string `json:"message"` } // Suggestion kinds (stable; new ones may be added additively). const ( - SuggestionRetryPattern = "retry_pattern" - SuggestionEscalateLimit = "escalate_limit" - SuggestionNoOutput = "no_output" - SuggestionShellCommand = "shell_command" - SuggestionGoTest = "go_test" - SuggestionSuspiciousTiming = "suspicious_timing" - SuggestionBudget = "budget" + SuggestionRetryPattern SuggestionKind = "retry_pattern" + SuggestionEscalateLimit SuggestionKind = "escalate_limit" + SuggestionNoOutput SuggestionKind = "no_output" + SuggestionShellCommand SuggestionKind = "shell_command" + SuggestionGoTest SuggestionKind = "go_test" + SuggestionSuspiciousTiming SuggestionKind = "suspicious_timing" + SuggestionBudget SuggestionKind = "budget" ) // Diagnose analyzes a run directory and returns a structured report. @@ -75,7 +95,18 @@ const ( // under it. For user-supplied input, resolve the path via // ResolveRunDir or DiagnoseMostRecent first, which enforce the // .tracker/runs/ layout. -func Diagnose(runDir string) (*DiagnoseReport, error) { +// +// If ctx is cancelled mid-parse, Diagnose returns ctx.Err() — a partial +// report is never returned as a success, so callers using deadlines can +// distinguish complete from truncated analysis. A nil ctx is treated as +// context.Background() (no cancellation possible). +func Diagnose(ctx context.Context, runDir string, opts ...DiagnoseConfig) (*DiagnoseReport, error) { + if ctx == nil { + ctx = context.Background() + } + cfg := firstDiagnoseConfig(opts) + logW := logWriterOrDiscard(cfg.LogWriter) + cpPath := filepath.Join(runDir, "checkpoint.json") cp, err := pipeline.LoadCheckpoint(cpPath) if err != nil { @@ -85,25 +116,43 @@ func Diagnose(runDir string) (*DiagnoseReport, error) { RunID: cp.RunID, CompletedNodes: len(cp.CompletedNodes), } - failures := collectNodeFailuresLib(runDir) - report.BudgetHalt = enrichFromActivityLib(runDir, failures) + failures := collectNodeFailures(runDir, logW) + halt, err := enrichFromActivity(ctx, runDir, failures, logW) + if err != nil { + return nil, err + } + report.BudgetHalt = halt report.Failures = sortedFailures(failures) report.Suggestions = buildSuggestions(report.Failures, report.BudgetHalt) return report, nil } // DiagnoseMostRecent finds the most recent run under workdir and diagnoses it. -func DiagnoseMostRecent(workdir string) (*DiagnoseReport, error) { +func DiagnoseMostRecent(ctx context.Context, workdir string, opts ...DiagnoseConfig) (*DiagnoseReport, error) { id, err := MostRecentRunID(workdir) if err != nil { return nil, err } - return Diagnose(filepath.Join(workdir, ".tracker", "runs", id)) + return Diagnose(ctx, filepath.Join(workdir, ".tracker", "runs", id), opts...) +} + +func firstDiagnoseConfig(opts []DiagnoseConfig) DiagnoseConfig { + if len(opts) == 0 { + return DiagnoseConfig{} + } + return opts[0] +} + +func logWriterOrDiscard(w io.Writer) io.Writer { + if w == nil { + return io.Discard + } + return w } // ----- internals ----- -func collectNodeFailuresLib(runDir string) map[string]*NodeFailure { +func collectNodeFailures(runDir string, logW io.Writer) map[string]*NodeFailure { failures := make(map[string]*NodeFailure) entries, err := os.ReadDir(runDir) if err != nil { @@ -113,14 +162,14 @@ func collectNodeFailuresLib(runDir string) map[string]*NodeFailure { if !e.IsDir() { continue } - if f := loadNodeFailureLib(runDir, e.Name()); f != nil { + if f := loadNodeFailure(runDir, e.Name(), logW); f != nil { failures[e.Name()] = f } } return failures } -func loadNodeFailureLib(runDir, nodeID string) *NodeFailure { +func loadNodeFailure(runDir, nodeID string, logW io.Writer) *NodeFailure { statusPath := filepath.Join(runDir, nodeID, "status.json") data, err := os.ReadFile(statusPath) if err != nil { @@ -131,7 +180,7 @@ func loadNodeFailureLib(runDir, nodeID string) *NodeFailure { ContextUpdates map[string]string `json:"context_updates"` } if err := json.Unmarshal(data, &status); err != nil { - fmt.Fprintf(os.Stderr, "warning: cannot parse %s: %v\n", statusPath, err) + fmt.Fprintf(logW, "warning: cannot parse %s: %v\n", statusPath, err) return nil } if status.Outcome != "fail" { @@ -145,8 +194,8 @@ func loadNodeFailureLib(runDir, nodeID string) *NodeFailure { return f } -// diagnoseEntryLib is a parsed activity.jsonl line with fields needed for diagnosis. -type diagnoseEntryLib struct { +// diagnoseEntry is a parsed activity.jsonl line with fields needed for diagnosis. +type diagnoseEntry struct { Timestamp string `json:"ts"` Type string `json:"type"` NodeID string `json:"node_id"` @@ -159,27 +208,39 @@ type diagnoseEntryLib struct { WallElapsedMs int64 `json:"wall_elapsed_ms"` } -func enrichFromActivityLib(runDir string, failures map[string]*NodeFailure) *BudgetHalt { +// enrichFromActivity streams activity.jsonl, populating failures + detecting +// budget halt events. Returns (nil, nil) if activity.jsonl does not exist +// (runs that never started). Returns ctx.Err() if cancellation fires +// mid-parse, and scanner.Err() if the scanner aborts (buffer overflow at +// 1 MB, I/O error) — both surface truncation to the caller so partial +// analysis is never silently treated as authoritative. +func enrichFromActivity(ctx context.Context, runDir string, failures map[string]*NodeFailure, logW io.Writer) (*BudgetHalt, error) { path := filepath.Join(runDir, "activity.jsonl") - data, err := os.ReadFile(path) + f, err := os.Open(path) if err != nil { - return nil + if os.IsNotExist(err) { + return nil, nil + } + return nil, fmt.Errorf("open activity log: %w", err) } + defer f.Close() + stageStarts := map[string]time.Time{} failSignatures := map[string][]string{} - halt := parseActivityLinesForDiagnose(string(data), failures, stageStarts, failSignatures) - applyRetryAnalysisLib(failures, failSignatures) - return halt -} - -func parseActivityLinesForDiagnose(data string, failures map[string]*NodeFailure, stageStarts map[string]time.Time, failSignatures map[string][]string) *BudgetHalt { var halt *BudgetHalt - for _, line := range strings.Split(data, "\n") { - line = strings.TrimSpace(line) + + scanner := bufio.NewScanner(f) + // Match LoadActivityLog: allow 1 MB lines. + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) + for scanner.Scan() { + if err := ctx.Err(); err != nil { + return nil, err + } + line := strings.TrimSpace(scanner.Text()) if line == "" { continue } - var entry diagnoseEntryLib + var entry diagnoseEntry if err := json.Unmarshal([]byte(line), &entry); err != nil { continue } @@ -191,24 +252,29 @@ func parseActivityLinesForDiagnose(data string, failures map[string]*NodeFailure Message: entry.Message, } } - enrichFromEntryNF(entry, failures, stageStarts, failSignatures) + enrichFromEntry(entry, failures, stageStarts, failSignatures) + } + if err := scanner.Err(); err != nil { + fmt.Fprintf(logW, "warning: activity log scanner stopped at %s: %v\n", path, err) + return nil, fmt.Errorf("scan activity log: %w", err) } - return halt + applyRetryAnalysis(failures, failSignatures) + return halt, nil } -func enrichFromEntryNF(entry diagnoseEntryLib, failures map[string]*NodeFailure, stageStarts map[string]time.Time, failSignatures map[string][]string) { - ts, _ := parseActivityTimestampLib(entry.Timestamp) +func enrichFromEntry(entry diagnoseEntry, failures map[string]*NodeFailure, stageStarts map[string]time.Time, failSignatures map[string][]string) { + ts, _ := parseActivityTimestamp(entry.Timestamp) switch entry.Type { case "stage_started": if !ts.IsZero() { stageStarts[entry.NodeID] = ts } case "stage_failed": - updateFailureTimingNF(failures[entry.NodeID], stageStarts, entry, ts) + updateFailureTiming(failures[entry.NodeID], stageStarts, entry, ts) sig := entry.Error + "\x00" + entry.ToolErr failSignatures[entry.NodeID] = append(failSignatures[entry.NodeID], sig) case "stage_completed": - updateFailureTimingNF(failures[entry.NodeID], stageStarts, entry, ts) + updateFailureTiming(failures[entry.NodeID], stageStarts, entry, ts) } if entry.NodeID == "" { return @@ -225,7 +291,7 @@ func enrichFromEntryNF(entry diagnoseEntryLib, failures map[string]*NodeFailure, } } -func updateFailureTimingNF(f *NodeFailure, stageStarts map[string]time.Time, entry diagnoseEntryLib, ts time.Time) { +func updateFailureTiming(f *NodeFailure, stageStarts map[string]time.Time, entry diagnoseEntry, ts time.Time) { if f == nil { return } @@ -237,7 +303,7 @@ func updateFailureTimingNF(f *NodeFailure, stageStarts map[string]time.Time, ent } } -func applyRetryAnalysisLib(failures map[string]*NodeFailure, failSignatures map[string][]string) { +func applyRetryAnalysis(failures map[string]*NodeFailure, failSignatures map[string][]string) { for nodeID, sigs := range failSignatures { f, ok := failures[nodeID] if !ok { diff --git a/tracker_diagnose_test.go b/tracker_diagnose_test.go index cbd8961..7d5a26c 100644 --- a/tracker_diagnose_test.go +++ b/tracker_diagnose_test.go @@ -1,11 +1,12 @@ package tracker import ( + "context" "testing" ) func TestDiagnose_CleanRun(t *testing.T) { - r, err := Diagnose("testdata/runs/ok") + r, err := Diagnose(context.Background(), "testdata/runs/ok") if err != nil { t.Fatalf("Diagnose: %v", err) } @@ -24,7 +25,7 @@ func TestDiagnose_CleanRun(t *testing.T) { } func TestDiagnose_FailureWithRetries(t *testing.T) { - r, err := Diagnose("testdata/runs/failed") + r, err := Diagnose(context.Background(), "testdata/runs/failed") if err != nil { t.Fatalf("Diagnose: %v", err) } @@ -44,7 +45,7 @@ func TestDiagnose_FailureWithRetries(t *testing.T) { if f.Handler != "tool" { t.Errorf("handler = %q", f.Handler) } - kinds := map[string]bool{} + kinds := map[SuggestionKind]bool{} for _, s := range r.Suggestions { kinds[s.Kind] = true } @@ -57,7 +58,7 @@ func TestDiagnose_FailureWithRetries(t *testing.T) { } func TestDiagnose_BudgetHalt(t *testing.T) { - r, err := Diagnose("testdata/runs/budget_halted") + r, err := Diagnose(context.Background(), "testdata/runs/budget_halted") if err != nil { t.Fatalf("Diagnose: %v", err) } @@ -71,3 +72,18 @@ func TestDiagnose_BudgetHalt(t *testing.T) { t.Error("empty breach message") } } + +// TestDiagnose_CtxCancelled verifies that a cancelled context propagates +// out of Diagnose — a partial report is never returned as a success, so +// automation with deadlines can distinguish complete from truncated output. +func TestDiagnose_CtxCancelled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel before the call + _, err := Diagnose(ctx, "testdata/runs/failed") + if err == nil { + t.Fatal("expected ctx.Err() to propagate, got nil") + } + if err != context.Canceled { + t.Errorf("err = %v, want context.Canceled", err) + } +} diff --git a/tracker_doctor.go b/tracker_doctor.go index 42ab4cd..ffd31a9 100644 --- a/tracker_doctor.go +++ b/tracker_doctor.go @@ -37,11 +37,27 @@ type DoctorConfig struct { // PipelineFile, when non-empty, adds a "Pipeline File" check that parses // and validates the given .dip / .dot file. PipelineFile string - // TrackerVersion and TrackerCommit are surfaced in the "Version - // Compatibility" check. They are populated by the CLI from build-time - // ldflags; library callers may leave them empty. - TrackerVersion string - TrackerCommit string + // versionInfo is populated by WithVersionInfo. Unexported so callers + // use the functional option rather than setting CLI-specific fields. + versionInfo versionInfo +} + +// versionInfo carries CLI-provided build metadata into a Doctor run. +type versionInfo struct { + version string + commit string +} + +// DoctorOption configures a Doctor run via a functional option. +type DoctorOption func(*DoctorConfig) + +// WithVersionInfo attaches a tracker version and commit hash for display in +// the "Version Compatibility" check. CLI callers populate these from +// build-time ldflags; library callers typically do not need this. +func WithVersionInfo(version, commit string) DoctorOption { + return func(c *DoctorConfig) { + c.versionInfo = versionInfo{version: version, commit: commit} + } } // DoctorReport is the structured result of a Doctor() call. @@ -52,10 +68,25 @@ type DoctorReport struct { Errors int `json:"errors"` } +// CheckStatus is the status of a CheckResult or CheckDetail. Enum-like +// typed string so consumers can switch-exhaust. "hint" is only valid on +// CheckDetail.Status (informational sub-items such as optional providers +// not configured). +type CheckStatus string + +// CheckStatus values. +const ( + CheckStatusOK CheckStatus = "ok" + CheckStatusWarn CheckStatus = "warn" + CheckStatusError CheckStatus = "error" + CheckStatusSkip CheckStatus = "skip" + CheckStatusHint CheckStatus = "hint" +) + // CheckResult is one section of a DoctorReport. type CheckResult struct { Name string `json:"name"` - Status string `json:"status"` // "ok" | "warn" | "error" | "skip" + Status CheckStatus `json:"status"` // "ok" | "warn" | "error" | "skip" Message string `json:"message,omitempty"` Hint string `json:"hint,omitempty"` Details []CheckDetail `json:"details,omitempty"` @@ -64,9 +95,9 @@ type CheckResult struct { // CheckDetail is one sub-line within a CheckResult — used for per-item // status lines (per-provider, per-binary, per-subdirectory). type CheckDetail struct { - Status string `json:"status"` // "ok" | "warn" | "error" | "hint" — "hint" is used for informational sub-items (e.g. optional providers not configured) - Message string `json:"message"` - Hint string `json:"hint,omitempty"` + Status CheckStatus `json:"status"` // "ok" | "warn" | "error" | "hint" + Message string `json:"message"` + Hint string `json:"hint,omitempty"` } // Doctor runs a suite of preflight checks and returns a structured report. @@ -78,10 +109,23 @@ type CheckDetail struct { // flag; library callers should leave it false unless they specifically // want live credential verification. // +// Provider probes and binary version lookups honor ctx: cancelling the +// context aborts in-flight checks. A nil context is treated as +// context.Background(). +// // Write side effects (gitignore fix-up, workdir creation prompts) are NOT // performed by Doctor — callers inspect the report and apply any fixes // themselves. -func Doctor(cfg DoctorConfig) (*DoctorReport, error) { +func Doctor(ctx context.Context, cfg DoctorConfig, opts ...DoctorOption) (*DoctorReport, error) { + if ctx == nil { + ctx = context.Background() + } + for _, opt := range opts { + if opt == nil { + continue + } + opt(&cfg) + } if cfg.WorkDir == "" { wd, err := os.Getwd() if err != nil { @@ -92,25 +136,25 @@ func Doctor(cfg DoctorConfig) (*DoctorReport, error) { r := &DoctorReport{} r.Checks = append(r.Checks, - checkEnvWarningsLib(), - checkProvidersLib(cfg.ProbeProviders), - checkDippinLib(), - checkVersionCompatLib(cfg.TrackerVersion, cfg.TrackerCommit), - checkOtherBinariesLib(cfg.Backend), - checkWorkdirLib(cfg.WorkDir), - checkArtifactDirsLib(cfg.WorkDir), - checkDiskSpaceLib(cfg.WorkDir), + checkEnvWarnings(), + checkProviders(ctx, cfg.ProbeProviders), + checkDippin(ctx), + checkVersionCompat(ctx, cfg.versionInfo.version, cfg.versionInfo.commit), + checkOtherBinaries(ctx, cfg.Backend), + checkWorkdir(cfg.WorkDir), + checkArtifactDirs(cfg.WorkDir), + checkDiskSpace(cfg.WorkDir), ) if cfg.PipelineFile != "" { - r.Checks = append(r.Checks, checkPipelineFileLib(cfg.PipelineFile)) + r.Checks = append(r.Checks, checkPipelineFile(cfg.PipelineFile)) } r.OK = true for _, c := range r.Checks { switch c.Status { - case "warn": + case CheckStatusWarn: r.Warnings++ - case "error": + case CheckStatusError: r.Errors++ r.OK = false } @@ -118,8 +162,8 @@ func Doctor(cfg DoctorConfig) (*DoctorReport, error) { return r, nil } -// checkEnvWarningsLib warns when opt-in security overrides are active. -func checkEnvWarningsLib() CheckResult { +// checkEnvWarnings warns when opt-in security overrides are active. +func checkEnvWarnings() CheckResult { dangerousVars := map[string]string{ "TRACKER_PASS_ENV": "passes all env vars to tool subprocesses (security risk)", "TRACKER_PASS_API_KEYS": "passes API keys to tool subprocesses (security risk)", @@ -131,11 +175,11 @@ func checkEnvWarningsLib() CheckResult { } } if len(found) == 0 { - return CheckResult{Name: "Environment Warnings", Status: "ok", Message: "no dangerous environment variables detected"} + return CheckResult{Name: "Environment Warnings", Status: CheckStatusOK, Message: "no dangerous environment variables detected"} } return CheckResult{ Name: "Environment Warnings", - Status: "warn", + Status: CheckStatusWarn, Message: fmt.Sprintf("dangerous variables set: %s", strings.Join(found, "; ")), Hint: "unset TRACKER_PASS_ENV and TRACKER_PASS_API_KEYS to restore default security posture", } @@ -199,9 +243,9 @@ var knownProviders = []providerDef{ }, } -// checkProvidersLib reports on each configured LLM provider. When probe +// checkProviders reports on each configured LLM provider. When probe // is true, a 1-token API call verifies auth for each configured provider. -func checkProvidersLib(probe bool) CheckResult { +func checkProviders(ctx context.Context, probe bool) CheckResult { out := CheckResult{Name: "LLM Providers"} var configuredNames []string var missingNames []string @@ -216,7 +260,7 @@ func checkProvidersLib(probe bool) CheckResult { masked := maskKey(key) if !isValidAPIKey(p.name, key) { out.Details = append(out.Details, CheckDetail{ - Status: "error", + Status: CheckStatusError, Message: fmt.Sprintf("%-15s %s=%s (invalid format)", p.name, envName, masked), Hint: fmt.Sprintf("%s keys should match expected format — run `tracker setup`", p.name), }) @@ -224,23 +268,29 @@ func checkProvidersLib(probe bool) CheckResult { continue } if probe && p.buildAdapter != nil { - authOk, authMsg := probeProvider(p, key) - if !authOk { - out.Details = append(out.Details, CheckDetail{ - Status: "error", - Message: fmt.Sprintf("%-15s %s=%s (auth failed: %s)", p.name, envName, masked, authMsg), - Hint: fmt.Sprintf("your %s key is invalid or expired — export a fresh key or run `tracker setup`", p.name), - }) + ok, probeMsg, isAuth := probeProvider(ctx, p, key) + if !ok { + detail := CheckDetail{Status: CheckStatusError} + if isAuth { + detail.Message = fmt.Sprintf("%-15s %s=%s (auth failed: %s)", p.name, envName, masked, probeMsg) + detail.Hint = fmt.Sprintf("your %s key is invalid or expired — export a fresh key or run `tracker setup`", p.name) + } else { + // DNS, timeout, transport, context cancel, or other non-auth failure. + // Do NOT tell the user to rotate a working key. + detail.Message = fmt.Sprintf("%-15s %s=%s (probe failed: %s)", p.name, envName, masked, probeMsg) + detail.Hint = fmt.Sprintf("probe for %s failed on network/transport — verify connectivity and %s_BASE_URL before rotating keys", p.name, strings.ToUpper(p.name)) + } + out.Details = append(out.Details, detail) hasProviderErrors = true continue } out.Details = append(out.Details, CheckDetail{ - Status: "ok", + Status: CheckStatusOK, Message: fmt.Sprintf("%-15s %s=%s (auth verified)", p.name, envName, masked), }) } else { out.Details = append(out.Details, CheckDetail{ - Status: "ok", + Status: CheckStatusOK, Message: fmt.Sprintf("%-15s %s=%s", p.name, envName, masked), }) } @@ -252,14 +302,14 @@ func checkProvidersLib(probe bool) CheckResult { for _, pd := range knownProviders { if pd.name == name { out.Details = append(out.Details, CheckDetail{ - Status: "error", + Status: CheckStatusError, Message: fmt.Sprintf("%-15s %s not set", pd.name, pd.envVars[0]), }) break } } } - out.Status = "error" + out.Status = CheckStatusError out.Message = "no LLM providers configured" out.Hint = "run `tracker setup` or export ANTHROPIC_API_KEY / OPENAI_API_KEY / GEMINI_API_KEY" return out @@ -269,15 +319,15 @@ func checkProvidersLib(probe bool) CheckResult { // "not configured" is informational when at least one provider works — // rendered as a hint line, not an error or warning, so Status=hint. out.Details = append(out.Details, CheckDetail{ - Status: "hint", + Status: CheckStatusHint, Message: fmt.Sprintf("not configured: %s (optional)", strings.Join(missingNames, ", ")), }) } if hasProviderErrors { - out.Status = "warn" + out.Status = CheckStatusWarn } else { - out.Status = "ok" + out.Status = CheckStatusOK } if probe { out.Message = fmt.Sprintf("%d provider(s) configured and auth verified", len(configuredNames)) @@ -296,17 +346,20 @@ func findProviderKey(envVars []string) (key, envName string) { return "", "" } -func probeProvider(p providerDef, key string) (bool, string) { +// probeProvider returns (ok, msg, isAuthFailure). The third return lets the +// caller distinguish an actual auth failure (rotate-the-key guidance) from +// a network/transport/timeout failure (don't rotate good keys). +func probeProvider(ctx context.Context, p providerDef, key string) (bool, string, bool) { adapter, err := p.buildAdapter(key) if err != nil { - return false, fmt.Sprintf("build adapter: %v", err) + return false, fmt.Sprintf("build adapter: %v", err), false } client, err := llm.NewClient(llm.WithProvider(adapter)) if err != nil { - return false, fmt.Sprintf("create client: %v", err) + return false, fmt.Sprintf("create client: %v", err), false } defer client.Close() - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + probeCtx, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() maxTok := 1 req := &llm.Request{ @@ -314,15 +367,39 @@ func probeProvider(p providerDef, key string) (bool, string) { Messages: []llm.Message{llm.UserMessage("ping")}, MaxTokens: &maxTok, } - _, err = client.Complete(ctx, req) + _, err = client.Complete(probeCtx, req) if err != nil { msg := err.Error() if isAuthError(msg) { - return false, "invalid or expired API key" + return false, "invalid or expired API key", true } - return false, trimErrMsg(msg, 80) + // Sanitize FIRST, then trim. If we trim first, a key that + // straddles the 80-char boundary gets cut into a shorter prefix + // that no longer matches the regex, leaking the prefix. Sanitize + // the full message, then trim whatever's left. + return false, trimErrMsg(sanitizeProviderError(msg), 80), false + } + return true, "", false +} + +// sanitizeProviderError strips API keys and bearer tokens from provider error +// text so they never land in CheckDetail.Message (which library consumers may +// log or forward to webhooks). +var ( + apiKeyPatterns = []*regexp.Regexp{ + regexp.MustCompile(`sk-ant-[A-Za-z0-9_\-]{6,}`), + regexp.MustCompile(`sk-[A-Za-z0-9_\-]{10,}`), + regexp.MustCompile(`AIza[0-9A-Za-z_\-]{20,}`), + } + bearerPattern = regexp.MustCompile(`(?i)bearer\s+[A-Za-z0-9._\-]{6,}`) +) + +func sanitizeProviderError(msg string) string { + for _, re := range apiKeyPatterns { + msg = re.ReplaceAllString(msg, "[redacted-key]") } - return true, "" + msg = bearerPattern.ReplaceAllString(msg, "Bearer [redacted]") + return msg } func isAuthError(msg string) bool { @@ -342,38 +419,38 @@ func trimErrMsg(msg string, maxLen int) string { return msg[:maxLen] + "..." } -// checkDippinLib verifies the dippin binary is installed. The full "dippin +// checkDippin verifies the dippin binary is installed. The full "dippin // at " string goes into the details so the CLI can print a // per-item line; the composite summary carries the shorter "dippin " // form. Historically the CLI emits both lines. -func checkDippinLib() CheckResult { +func checkDippin(ctx context.Context) CheckResult { path, err := exec.LookPath("dippin") if err != nil { return CheckResult{ Name: "Dippin Language", - Status: "error", + Status: CheckStatusError, Message: "dippin binary not found in PATH", Hint: "install from https://github.com/2389-research/dippin-lang (required for pipeline linting)", } } - ver := getDippinVersion(path) + ver := getDippinVersion(ctx, path) return CheckResult{ Name: "Dippin Language", - Status: "ok", + Status: CheckStatusOK, Details: []CheckDetail{{ - Status: "ok", + Status: CheckStatusOK, Message: fmt.Sprintf("dippin %s at %s", ver, path), }}, Message: fmt.Sprintf("dippin %s", ver), } } -func getDippinVersion(path string) string { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) +func getDippinVersion(ctx context.Context, path string) string { + probeCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - out, err := exec.CommandContext(ctx, path, "--version").CombinedOutput() + out, err := exec.CommandContext(probeCtx, path, "--version").CombinedOutput() if err != nil { - out, err = exec.CommandContext(ctx, path, "version").CombinedOutput() + out, err = exec.CommandContext(probeCtx, path, "version").CombinedOutput() if err != nil { return "(version unknown)" } @@ -387,25 +464,25 @@ func getDippinVersion(path string) string { return ver } -// checkVersionCompatLib verifies the installed dippin version matches the +// checkVersionCompat verifies the installed dippin version matches the // go.mod pin (on major and minor). trackerVersion / trackerCommit, when // non-empty, are surfaced as a detail line. -func checkVersionCompatLib(trackerVersion, trackerCommit string) CheckResult { +func checkVersionCompat(ctx context.Context, trackerVersion, trackerCommit string) CheckResult { out := CheckResult{Name: "Version Compatibility"} if trackerVersion != "" { msg := fmt.Sprintf("tracker %s", trackerVersion) if trackerCommit != "" { msg = fmt.Sprintf("tracker %s (commit %s)", trackerVersion, trackerCommit) } - out.Details = append(out.Details, CheckDetail{Status: "ok", Message: msg}) + out.Details = append(out.Details, CheckDetail{Status: CheckStatusOK, Message: msg}) } dippinPath, err := exec.LookPath("dippin") if err != nil { out.Details = append(out.Details, CheckDetail{ - Status: "warn", + Status: CheckStatusWarn, Message: "dippin not found — skipping version compatibility check", }) - out.Status = "warn" + out.Status = CheckStatusWarn if trackerVersion != "" { out.Message = fmt.Sprintf("tracker %s / dippin not found", trackerVersion) } else { @@ -413,18 +490,18 @@ func checkVersionCompatLib(trackerVersion, trackerCommit string) CheckResult { } return out } - cliVer := getDippinVersion(dippinPath) + cliVer := getDippinVersion(ctx, dippinPath) out.Details = append(out.Details, CheckDetail{ - Status: "ok", + Status: CheckStatusOK, Message: fmt.Sprintf("dippin %s (installed) / %s (go.mod pin)", cliVer, PinnedDippinVersion), }) if mismatch, msg := checkDippinVersionMismatch(cliVer, PinnedDippinVersion); mismatch { out.Details = append(out.Details, CheckDetail{ - Status: "warn", + Status: CheckStatusWarn, Message: fmt.Sprintf("dippin version mismatch: %s", msg), }) - out.Status = "warn" + out.Status = CheckStatusWarn if trackerVersion != "" { out.Message = fmt.Sprintf("tracker %s / dippin %s (mismatched — expected %s)", trackerVersion, cliVer, PinnedDippinVersion) } else { @@ -433,7 +510,7 @@ func checkVersionCompatLib(trackerVersion, trackerCommit string) CheckResult { out.Hint = fmt.Sprintf("install dippin %s to match the go.mod pin", PinnedDippinVersion) return out } - out.Status = "ok" + out.Status = CheckStatusOK if trackerVersion != "" { out.Message = fmt.Sprintf("tracker %s / dippin %s", trackerVersion, cliVer) } else { @@ -471,63 +548,65 @@ func parseVersionMajorMinor(ver string) (major, minor int, ok bool) { return major, minor, true } -// checkOtherBinariesLib checks for git (recommended) and claude (required +// checkOtherBinaries checks for git (recommended) and claude (required // when backend == "claude-code", optional otherwise). -func checkOtherBinariesLib(backend string) CheckResult { +func checkOtherBinaries(ctx context.Context, backend string) CheckResult { out := CheckResult{Name: "Optional Binaries"} hasErr := false hasWarn := false if _, err := exec.LookPath("git"); err == nil { out.Details = append(out.Details, CheckDetail{ - Status: "ok", + Status: CheckStatusOK, Message: "git found (recommended for pipeline versioning)", }) } else { out.Details = append(out.Details, CheckDetail{ - Status: "warn", + Status: CheckStatusWarn, Message: "git not found in PATH (recommended for pipeline versioning)", }) hasWarn = true } claudePath, claudeErr := exec.LookPath("claude") if claudeErr == nil { - claudeVer := getBinaryVersion(claudePath, "--version") + claudeVer := getBinaryVersion(ctx, claudePath, "--version") out.Details = append(out.Details, CheckDetail{ - Status: "ok", + Status: CheckStatusOK, Message: fmt.Sprintf("claude %s (for --backend claude-code)", claudeVer), }) } else if backend == "claude-code" { out.Details = append(out.Details, CheckDetail{ - Status: "error", + Status: CheckStatusError, Message: "claude CLI not found in PATH (required for --backend claude-code)", Hint: "install the Claude CLI from https://claude.ai/code", }) hasErr = true } else { out.Details = append(out.Details, CheckDetail{ - Status: "warn", + Status: CheckStatusWarn, Message: "claude not found in PATH (install for --backend claude-code support)", }) hasWarn = true } switch { case hasErr: - out.Status = "error" + out.Status = CheckStatusError out.Message = "required binary missing for selected backend" out.Hint = "install the Claude CLI from https://claude.ai/code" case hasWarn: - out.Status = "warn" + out.Status = CheckStatusWarn out.Message = "some optional binaries missing" out.Hint = "install git and/or the Claude CLI to unlock all tracker features" default: - out.Status = "ok" + out.Status = CheckStatusOK out.Message = "optional binaries available" } return out } -func getBinaryVersion(path, flag string) string { - out, err := exec.Command(path, flag).CombinedOutput() +func getBinaryVersion(ctx context.Context, path, flag string) string { + probeCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + out, err := exec.CommandContext(probeCtx, path, flag).CombinedOutput() if err != nil { return "(version unknown)" } @@ -538,27 +617,36 @@ func getBinaryVersion(path, flag string) string { return strings.TrimSpace(lines[0]) } -// checkWorkdirLib verifies the working directory exists and is writable. +// checkWorkdir verifies the working directory exists and is writable. // It also detects missing .gitignore entries but does NOT modify the file — // the CLI applies any fix-up separately. -func checkWorkdirLib(workdir string) CheckResult { +func checkWorkdir(workdir string) CheckResult { out := CheckResult{Name: "Working Directory"} info, err := os.Stat(workdir) if err != nil { - out.Status = "error" - out.Message = fmt.Sprintf("%s does not exist", workdir) - out.Hint = fmt.Sprintf("create the directory: mkdir -p %s", workdir) + out.Status = CheckStatusError + switch { + case os.IsNotExist(err): + out.Message = fmt.Sprintf("%s does not exist", workdir) + out.Hint = fmt.Sprintf("create the directory: mkdir -p %s", workdir) + case os.IsPermission(err): + out.Message = fmt.Sprintf("permission denied accessing %s", workdir) + out.Hint = fmt.Sprintf("check permissions on %s or a parent directory", workdir) + default: + out.Message = fmt.Sprintf("cannot stat %s: %v", workdir, err) + out.Hint = "check the path and its parent directories" + } return out } if !info.IsDir() { - out.Status = "error" + out.Status = CheckStatusError out.Message = fmt.Sprintf("%s is not a directory", workdir) out.Hint = "point --workdir at a directory, not a file" return out } f, err := os.CreateTemp(workdir, ".tracker_probe_*") if err != nil { - out.Status = "error" + out.Status = CheckStatusError out.Message = fmt.Sprintf("%s is not writable", workdir) out.Hint = fmt.Sprintf("check permissions: chmod u+w %s", workdir) return out @@ -570,7 +658,7 @@ func checkWorkdirLib(workdir string) CheckResult { home, _ := os.UserHomeDir() if workdir == home || workdir == "/" { out.Details = append(out.Details, CheckDetail{ - Status: "warn", + Status: CheckStatusWarn, Message: fmt.Sprintf("%s (risk of accidental data loss — use a project subdirectory)", workdir), }) hasWarn = true @@ -579,21 +667,21 @@ func checkWorkdirLib(workdir string) CheckResult { // Detect missing .gitignore entries without modifying the file. if missing := missingGitignoreEntries(workdir); missing != "" { out.Details = append(out.Details, CheckDetail{ - Status: "warn", + Status: CheckStatusWarn, Message: missing, }) hasWarn = true } out.Details = append(out.Details, CheckDetail{ - Status: "ok", + Status: CheckStatusOK, Message: fmt.Sprintf("%s (writable)", workdir), }) if hasWarn { - out.Status = "warn" + out.Status = CheckStatusWarn out.Message = fmt.Sprintf("%s is writable (with warnings)", workdir) } else { - out.Status = "ok" + out.Status = CheckStatusOK out.Message = fmt.Sprintf("%s is writable", workdir) } return out @@ -636,57 +724,68 @@ func missingGitignoreEntries(workdir string) string { return "" } -// checkArtifactDirsLib verifies the .ai artifact directory is usable +// checkArtifactDirs verifies the .ai artifact directory is usable // (either exists and is writable, or can be created). -func checkArtifactDirsLib(workdir string) CheckResult { +func checkArtifactDirs(workdir string) CheckResult { out := CheckResult{Name: "Artifact Directories"} allOk := true aiDir := filepath.Join(workdir, ".ai") - if info, err := os.Stat(aiDir); err == nil { + info, err := os.Stat(aiDir) + switch { + case err == nil: switch { case !info.IsDir(): out.Details = append(out.Details, CheckDetail{ - Status: "error", + Status: CheckStatusError, Message: ".ai is not a directory", }) allOk = false case !isDirWritable(aiDir): out.Details = append(out.Details, CheckDetail{ - Status: "error", + Status: CheckStatusError, Message: fmt.Sprintf("%s exists but is not writable", aiDir), Hint: fmt.Sprintf("check permissions: chmod u+w %s", aiDir), }) allOk = false default: out.Details = append(out.Details, CheckDetail{ - Status: "ok", + Status: CheckStatusOK, Message: fmt.Sprintf("%s exists and is writable", aiDir), }) } - } else { + case os.IsNotExist(err): if isDirWritable(workdir) { out.Details = append(out.Details, CheckDetail{ - Status: "ok", + Status: CheckStatusOK, Message: fmt.Sprintf("%s will be created on first run", aiDir), }) } else { out.Details = append(out.Details, CheckDetail{ - Status: "error", + Status: CheckStatusError, Message: fmt.Sprintf("%s cannot be created (parent not writable)", aiDir), }) allOk = false } + default: + // Non-ENOENT stat failure — permission denied, I/O error, etc. + // Report the real failure instead of pretending .ai is missing. + out.Details = append(out.Details, CheckDetail{ + Status: CheckStatusError, + Message: fmt.Sprintf("cannot inspect %s: %v", aiDir, err), + Hint: fmt.Sprintf("check permissions on %s and its parents", aiDir), + }) + allOk = false } if allOk { - out.Status = "ok" + out.Status = CheckStatusOK out.Message = "artifact directories writable" return out } // Promote to "error" if any detail is an error (not just a warning). - out.Status = "warn" + out.Status = CheckStatusWarn for _, d := range out.Details { - if d.Status == "error" { - out.Status = "error" + if d.Status == CheckStatusError { + out.Status = CheckStatusError break } } @@ -705,37 +804,46 @@ func isDirWritable(dir string) bool { return true } -// checkDiskSpaceLib warns when available disk space under workdir is low. +// checkDiskSpace warns when available disk space under workdir is low. // The implementation is platform-specific; see tracker_doctor_unix.go and // tracker_doctor_windows.go. -// checkPipelineFileLib parses and validates a pipeline file. -func checkPipelineFileLib(pipelineFile string) CheckResult { +// checkPipelineFile parses and validates a pipeline file. +func checkPipelineFile(pipelineFile string) CheckResult { out := CheckResult{Name: "Pipeline File"} if _, err := os.Stat(pipelineFile); err != nil { - out.Status = "error" - out.Message = fmt.Sprintf("%s does not exist", pipelineFile) - out.Hint = fmt.Sprintf("check the file path: %s", pipelineFile) + out.Status = CheckStatusError + switch { + case os.IsNotExist(err): + out.Message = fmt.Sprintf("%s does not exist", pipelineFile) + out.Hint = fmt.Sprintf("check the file path: %s", pipelineFile) + case os.IsPermission(err): + out.Message = fmt.Sprintf("permission denied reading %s", pipelineFile) + out.Hint = fmt.Sprintf("check permissions: chmod +r %s", pipelineFile) + default: + out.Message = fmt.Sprintf("cannot stat %s: %v", pipelineFile, err) + out.Hint = "check the file path and permissions" + } return out } hasWarn := false if !strings.HasSuffix(pipelineFile, ".dip") && !strings.HasSuffix(pipelineFile, ".dot") { out.Details = append(out.Details, CheckDetail{ - Status: "warn", + Status: CheckStatusWarn, Message: fmt.Sprintf("%s is not a .dip or .dot file — may not be a valid pipeline", pipelineFile), }) hasWarn = true } fileBytes, err := os.ReadFile(pipelineFile) if err != nil { - out.Status = "error" + out.Status = CheckStatusError out.Message = fmt.Sprintf("%s: read error: %v", pipelineFile, err) out.Hint = "check file permissions" return out } graph, err := parsePipelineSource(string(fileBytes), detectSourceFormat(string(fileBytes))) if err != nil { - out.Status = "error" + out.Status = CheckStatusError out.Message = fmt.Sprintf("%s: parse error: %v", pipelineFile, err) out.Hint = "run `tracker validate " + pipelineFile + "` for full details" return out @@ -745,17 +853,17 @@ func checkPipelineFileLib(pipelineFile string) CheckResult { if ve != nil && len(ve.Errors) > 0 { for _, e := range ve.Errors { out.Details = append(out.Details, CheckDetail{ - Status: "error", + Status: CheckStatusError, Message: fmt.Sprintf("error: %s", e), }) } for _, w := range ve.Warnings { out.Details = append(out.Details, CheckDetail{ - Status: "warn", + Status: CheckStatusWarn, Message: w, }) } - out.Status = "error" + out.Status = CheckStatusError out.Message = fmt.Sprintf("%s failed validation (%d error(s))", pipelineFile, len(ve.Errors)) out.Hint = "run `tracker validate " + pipelineFile + "` for full details" return out @@ -763,28 +871,28 @@ func checkPipelineFileLib(pipelineFile string) CheckResult { if ve != nil && len(ve.Warnings) > 0 { for _, w := range ve.Warnings { out.Details = append(out.Details, CheckDetail{ - Status: "warn", + Status: CheckStatusWarn, Message: w, }) } out.Details = append(out.Details, CheckDetail{ - Status: "ok", + Status: CheckStatusOK, Message: fmt.Sprintf("%s valid (%d nodes, %d edges, %d warning(s))", pipelineFile, len(graph.Nodes), len(graph.Edges), len(ve.Warnings)), }) - out.Status = "warn" + out.Status = CheckStatusWarn out.Message = fmt.Sprintf("%s valid with %d warning(s)", pipelineFile, len(ve.Warnings)) return out } out.Details = append(out.Details, CheckDetail{ - Status: "ok", + Status: CheckStatusOK, Message: fmt.Sprintf("%s valid (%d nodes, %d edges)", pipelineFile, len(graph.Nodes), len(graph.Edges)), }) if hasWarn { - out.Status = "warn" + out.Status = CheckStatusWarn out.Message = fmt.Sprintf("%s is valid but has warnings", pipelineFile) } else { - out.Status = "ok" + out.Status = CheckStatusOK out.Message = fmt.Sprintf("%s is valid", pipelineFile) } return out diff --git a/tracker_doctor_test.go b/tracker_doctor_test.go index 71d1569..edb9cc0 100644 --- a/tracker_doctor_test.go +++ b/tracker_doctor_test.go @@ -3,8 +3,11 @@ package tracker import ( + "context" "os" "path/filepath" + "runtime" + "strings" "testing" ) @@ -12,7 +15,7 @@ func TestDoctor_NoProbe_KeyPresent(t *testing.T) { workdir := t.TempDir() t.Setenv("ANTHROPIC_API_KEY", "sk-ant-test-12345678901234567890") - r, err := Doctor(DoctorConfig{WorkDir: workdir, ProbeProviders: false}) + r, err := Doctor(context.Background(), DoctorConfig{WorkDir: workdir, ProbeProviders: false}) if err != nil { t.Fatalf("Doctor: %v", err) } @@ -36,7 +39,7 @@ func TestDoctor_NoProviderKeys(t *testing.T) { t.Setenv(k, "") } - r, err := Doctor(DoctorConfig{WorkDir: workdir, ProbeProviders: false}) + r, err := Doctor(context.Background(), DoctorConfig{WorkDir: workdir, ProbeProviders: false}) if err != nil { t.Fatalf("Doctor: %v", err) } @@ -62,7 +65,7 @@ func TestDoctor_PipelineFileValidation(t *testing.T) { ` must(t, os.WriteFile(pf, []byte(src), 0o644)) - r, err := Doctor(DoctorConfig{WorkDir: workdir, PipelineFile: pf, ProbeProviders: false}) + r, err := Doctor(context.Background(), DoctorConfig{WorkDir: workdir, PipelineFile: pf, ProbeProviders: false}) if err != nil { t.Fatalf("Doctor: %v", err) } @@ -76,3 +79,141 @@ func TestDoctor_PipelineFileValidation(t *testing.T) { t.Fatal("Pipeline File check missing when PipelineFile set") } } + +func TestSanitizeProviderError(t *testing.T) { + cases := []struct { + name string + in string + want string + }{ + { + name: "anthropic key", + in: "auth failed: sk-ant-api03-abcdef1234567890abcdef", + want: "auth failed: [redacted-key]", + }, + { + name: "openai key", + in: "invalid key sk-abcdef1234567890abcdef", + want: "invalid key [redacted-key]", + }, + { + name: "google key", + in: "request failed AIzaSyAbcDef1234567890abcdef_01", + want: "request failed [redacted-key]", + }, + { + name: "bearer token", + in: "401 Unauthorized: Bearer abc.def.ghi12345", + want: "401 Unauthorized: Bearer [redacted]", + }, + { + name: "plain message", + in: "connection refused", + want: "connection refused", + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + if got := sanitizeProviderError(c.in); got != c.want { + t.Errorf("sanitizeProviderError(%q) = %q, want %q", c.in, got, c.want) + } + }) + } +} + +// TestSanitizeThenTrim_NoPartialKeyLeak verifies the sanitize-before-trim +// ordering in probeProvider. A key that straddles the trim boundary must +// not produce a leaked prefix after truncation. Regression guard for PR +// feedback on issue #106 follow-up. +func TestSanitizeThenTrim_NoPartialKeyLeak(t *testing.T) { + // Construct a message where the key starts at char 50 and runs past + // the 80-char truncation point. Trimming first would leave a 30-char + // prefix of the key that's shorter than the regex minimum, so the + // regex would miss it and the prefix would leak. + key := "sk-ant-api03-" + strings.Repeat("A", 60) + msg := strings.Repeat("x", 50) + key + + // Correct order: sanitize first, then trim. + got := trimErrMsg(sanitizeProviderError(msg), 80) + + if strings.Contains(got, "sk-ant-") { + t.Errorf("got = %q; leaked key prefix (must be redacted before trim)", got) + } + if !strings.Contains(got, "[redacted-key]") { + t.Errorf("got = %q; want [redacted-key] substitution", got) + } +} + +// TestCheckWorkdir_DistinguishesErrorKinds verifies that permission-denied +// and other non-ENOENT stat failures are reported with the right remediation +// hint, rather than being reported as "does not exist" + an mkdir hint. +func TestCheckWorkdir_DistinguishesErrorKinds(t *testing.T) { + t.Run("missing path", func(t *testing.T) { + r := checkWorkdir(filepath.Join(t.TempDir(), "does-not-exist")) + if r.Status != CheckStatusError { + t.Fatalf("status = %q, want error", r.Status) + } + if !strings.Contains(r.Message, "does not exist") { + t.Errorf("message = %q, want 'does not exist'", r.Message) + } + if !strings.Contains(r.Hint, "mkdir") { + t.Errorf("hint = %q, want mkdir suggestion", r.Hint) + } + }) + t.Run("permission denied", func(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("permission semantics differ on Windows") + } + if os.Geteuid() == 0 { + t.Skip("permission tests are meaningless as root") + } + parent := t.TempDir() + inner := filepath.Join(parent, "locked") + must(t, os.Mkdir(inner, 0o755)) + // Revoke search/execute on parent → stat of inner fails with EACCES. + must(t, os.Chmod(parent, 0o000)) + t.Cleanup(func() { _ = os.Chmod(parent, 0o755) }) + + r := checkWorkdir(inner) + if r.Status != CheckStatusError { + t.Fatalf("status = %q, want error", r.Status) + } + if strings.Contains(r.Hint, "mkdir") { + t.Errorf("permission error should not suggest mkdir, got hint = %q", r.Hint) + } + if !strings.Contains(r.Message, "permission denied") && + !strings.Contains(r.Message, "cannot stat") { + t.Errorf("message = %q, want permission-denied or stat-error wording", r.Message) + } + }) +} + +// TestCheckArtifactDirs_NonENOENTStatError verifies that a stat failure on +// .ai that is not ENOENT (e.g. permission denied) is reported as an error +// rather than silently treated as "will be created on first run". +func TestCheckArtifactDirs_NonENOENTStatError(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("permission semantics differ on Windows") + } + if os.Geteuid() == 0 { + t.Skip("permission tests are meaningless as root") + } + workdir := t.TempDir() + aiDir := filepath.Join(workdir, ".ai") + must(t, os.Mkdir(aiDir, 0o755)) + // Make workdir unreadable/unsearchable so stat on .ai fails with EACCES + // (not ENOENT — .ai does exist). + must(t, os.Chmod(workdir, 0o000)) + t.Cleanup(func() { _ = os.Chmod(workdir, 0o755) }) + + r := checkArtifactDirs(workdir) + if r.Status != CheckStatusError { + t.Fatalf("status = %q, want error (permission failure must not be reported as OK)", r.Status) + } + // No detail should say "will be created on first run" — that's the lie we're guarding against. + for _, d := range r.Details { + if strings.Contains(d.Message, "will be created") { + t.Errorf("detail %q hides the real permission error", d.Message) + } + } +} diff --git a/tracker_doctor_unix.go b/tracker_doctor_unix.go index 9b546dd..155f651 100644 --- a/tracker_doctor_unix.go +++ b/tracker_doctor_unix.go @@ -8,12 +8,12 @@ import ( "syscall" ) -func checkDiskSpaceLib(workdir string) CheckResult { +func checkDiskSpace(workdir string) CheckResult { var stat syscall.Statfs_t if err := syscall.Statfs(workdir, &stat); err != nil { return CheckResult{ Name: "Disk Space", - Status: "warn", + Status: CheckStatusWarn, Message: fmt.Sprintf("could not determine disk space: %v", err), } } @@ -23,14 +23,14 @@ func checkDiskSpaceLib(workdir string) CheckResult { if availableGB < minGB { return CheckResult{ Name: "Disk Space", - Status: "warn", + Status: CheckStatusWarn, Message: fmt.Sprintf("low disk space: %.2f GB available (recommended: %.1f GB+)", availableGB, minGB), Hint: "free up disk space before running long pipelines", } } return CheckResult{ Name: "Disk Space", - Status: "ok", + Status: CheckStatusOK, Message: fmt.Sprintf("%.2f GB available", availableGB), } } diff --git a/tracker_doctor_windows.go b/tracker_doctor_windows.go index 9003275..da14707 100644 --- a/tracker_doctor_windows.go +++ b/tracker_doctor_windows.go @@ -3,10 +3,10 @@ package tracker -func checkDiskSpaceLib(_ string) CheckResult { +func checkDiskSpace(_ string) CheckResult { return CheckResult{ Name: "Disk Space", - Status: "ok", + Status: CheckStatusOK, Message: "disk space check not available on Windows", } } diff --git a/tracker_events.go b/tracker_events.go index af57df0..6725f0a 100644 --- a/tracker_events.go +++ b/tracker_events.go @@ -17,9 +17,9 @@ import ( const ndjsonTimestampLayout = "2006-01-02T15:04:05.000Z07:00" -// NDJSONEvent is the stable wire format for the tracker --json mode. Field +// StreamEvent is the stable wire format for the tracker --json mode. Field // tags are stable; new optional fields may be added without a major bump. -type NDJSONEvent struct { +type StreamEvent struct { Timestamp string `json:"ts"` Source string `json:"source"` // "pipeline", "llm", "agent" Type string `json:"type"` // event type within source @@ -33,13 +33,21 @@ type NDJSONEvent struct { Content string `json:"content,omitempty"` // text content (LLM output, tool output) } -// NDJSONWriter is a thread-safe writer that serializes NDJSONEvents line by +// NDJSONWriter is a thread-safe writer that serializes StreamEvents line by // line onto an io.Writer. Library consumers use it to produce the same // stream as the tracker CLI's --json mode. +// +// Backpressure note: Write holds an internal mutex for the duration of the +// underlying io.Writer.Write call. When three handler sources (pipeline, +// agent, LLM trace) share one writer, a slow backing writer serializes +// handler callbacks across those sources. If the backing writer can block +// (network socket, pipe), wrap it in a bufio.Writer or a channel-backed +// forwarder to decouple producers from the slow sink. type NDJSONWriter struct { - mu sync.Mutex - w io.Writer - errOnce sync.Once + mu sync.Mutex + w io.Writer + errOnce sync.Once + panicOnce sync.Once } // NewNDJSONWriter returns a new writer backed by w. @@ -47,28 +55,41 @@ func NewNDJSONWriter(w io.Writer) *NDJSONWriter { return &NDJSONWriter{w: w} } -// Write serializes evt as a JSON line. Safe to call from multiple goroutines. -func (s *NDJSONWriter) Write(evt NDJSONEvent) { +// Write serializes evt as a JSON line. Safe to call from multiple +// goroutines. Returns a non-nil error if marshalling or writing to the +// underlying io.Writer fails, including short writes (io.Writer.Write +// may legally return n < len(data) with a nil error). The first write +// error is also logged to os.Stderr once so long-running callers that +// ignore the return value still surface it. +func (s *NDJSONWriter) Write(evt StreamEvent) error { data, err := json.Marshal(evt) if err != nil { - fmt.Fprintf(os.Stderr, "tracker: marshal NDJSON event: %v\n", err) - return + return fmt.Errorf("marshal NDJSON event: %w", err) } data = append(data, '\n') s.mu.Lock() defer s.mu.Unlock() - if _, werr := s.w.Write(data); werr != nil { + n, werr := s.w.Write(data) + if werr == nil && n < len(data) { + werr = io.ErrShortWrite + } + if werr != nil { s.errOnce.Do(func() { fmt.Fprintf(os.Stderr, "tracker: NDJSON stream write error: %v (further write errors suppressed)\n", werr) }) + return werr } + return nil } // PipelineHandler returns a pipeline.PipelineEventHandler that writes events -// to this stream. +// to this stream. Panics in the underlying writer are recovered and logged +// to os.Stderr once (per writer instance) so a misbehaving sink cannot +// crash the pipeline goroutine. func (s *NDJSONWriter) PipelineHandler() pipeline.PipelineEventHandler { return pipeline.PipelineEventHandlerFunc(func(evt pipeline.PipelineEvent) { - entry := NDJSONEvent{ + defer s.recoverPanic("pipeline") + entry := StreamEvent{ Timestamp: evt.Timestamp.Format(ndjsonTimestampLayout), Source: "pipeline", Type: string(evt.Type), @@ -79,14 +100,17 @@ func (s *NDJSONWriter) PipelineHandler() pipeline.PipelineEventHandler { if evt.Err != nil { entry.Error = evt.Err.Error() } - s.Write(entry) + _ = s.Write(entry) }) } -// TraceObserver returns an llm.TraceObserver that writes trace events to this stream. +// TraceObserver returns an llm.TraceObserver that writes trace events to +// this stream. Panics in the underlying writer are recovered (see +// PipelineHandler). func (s *NDJSONWriter) TraceObserver() llm.TraceObserver { return llm.TraceObserverFunc(func(evt llm.TraceEvent) { - s.Write(NDJSONEvent{ + defer s.recoverPanic("llm") + _ = s.Write(StreamEvent{ Timestamp: time.Now().Format(ndjsonTimestampLayout), Source: "llm", Type: string(evt.Kind), @@ -98,9 +122,12 @@ func (s *NDJSONWriter) TraceObserver() llm.TraceObserver { }) } -// AgentHandler returns an agent.EventHandler that writes agent events to this stream. +// AgentHandler returns an agent.EventHandler that writes agent events to +// this stream. Panics in the underlying writer are recovered (see +// PipelineHandler). func (s *NDJSONWriter) AgentHandler() agent.EventHandler { return agent.EventHandlerFunc(func(evt agent.Event) { + defer s.recoverPanic("agent") content := evt.ToolOutput if content == "" { content = evt.Text @@ -109,7 +136,7 @@ func (s *NDJSONWriter) AgentHandler() agent.EventHandler { if ts.IsZero() { ts = time.Now() } - entry := NDJSONEvent{ + entry := StreamEvent{ Timestamp: ts.Format(ndjsonTimestampLayout), Source: "agent", Type: string(evt.Type), @@ -119,12 +146,25 @@ func (s *NDJSONWriter) AgentHandler() agent.EventHandler { ToolName: evt.ToolName, Content: content, } - entry.Error = buildNDJSONEntryError(evt) - s.Write(entry) + entry.Error = buildStreamEntryError(evt) + _ = s.Write(entry) }) } -func buildNDJSONEntryError(evt agent.Event) string { +// recoverPanic recovers from a handler panic and logs the first occurrence +// per writer instance. Using a per-instance sync.Once (not package-level) +// means multiple NDJSONWriter instances (e.g., different runs streaming to +// different sinks) each get their own suppression state, so one misbehaving +// sink does not silence unrelated panics elsewhere in the process. +func (s *NDJSONWriter) recoverPanic(source string) { + if r := recover(); r != nil { + s.panicOnce.Do(func() { + fmt.Fprintf(os.Stderr, "tracker: NDJSON %s handler recovered from panic: %v (further panics suppressed)\n", source, r) + }) + } +} + +func buildStreamEntryError(evt agent.Event) string { if evt.ToolError == "" && evt.Err == nil { return "" } diff --git a/tracker_events_test.go b/tracker_events_test.go index 113a12d..359c8b5 100644 --- a/tracker_events_test.go +++ b/tracker_events_test.go @@ -1,4 +1,4 @@ -// ABOUTME: Tests for the public NDJSONWriter and NDJSONEvent types in the tracker package. +// ABOUTME: Tests for the public NDJSONWriter and StreamEvent types in the tracker package. // ABOUTME: Covers write, stable JSON tags, concurrency, and handler factory methods. package tracker @@ -6,6 +6,8 @@ import ( "bytes" "encoding/json" "errors" + "io" + "os" "strings" "sync" "testing" @@ -19,10 +21,10 @@ import ( func TestNDJSONWriter_Write(t *testing.T) { var buf bytes.Buffer w := NewNDJSONWriter(&buf) - w.Write(NDJSONEvent{Timestamp: "2026-04-17T10:00:00Z", Source: "pipeline", Type: "stage_started", NodeID: "N1"}) + w.Write(StreamEvent{Timestamp: "2026-04-17T10:00:00Z", Source: "pipeline", Type: "stage_started", NodeID: "N1"}) line := strings.TrimSuffix(buf.String(), "\n") - var got NDJSONEvent + var got StreamEvent if err := json.Unmarshal([]byte(line), &got); err != nil { t.Fatalf("unmarshal: %v", err) } @@ -34,7 +36,7 @@ func TestNDJSONWriter_Write(t *testing.T) { func TestNDJSONWriter_StableJSONTags(t *testing.T) { var buf bytes.Buffer w := NewNDJSONWriter(&buf) - w.Write(NDJSONEvent{ + w.Write(StreamEvent{ Timestamp: "t", Source: "agent", Type: "tool_call", RunID: "r1", NodeID: "n1", Message: "m", Error: "e", Provider: "p", Model: "mo", ToolName: "tn", Content: "c", @@ -60,7 +62,7 @@ func TestNDJSONWriter_ConcurrentWrites(t *testing.T) { for i := 0; i < n; i++ { go func() { defer wg.Done() - w.Write(NDJSONEvent{Timestamp: "t", Source: "pipeline", Type: "x"}) + w.Write(StreamEvent{Timestamp: "t", Source: "pipeline", Type: "x"}) }() } wg.Wait() @@ -70,7 +72,7 @@ func TestNDJSONWriter_ConcurrentWrites(t *testing.T) { t.Fatalf("got %d lines, want %d", len(lines), n) } for i, l := range lines { - var evt NDJSONEvent + var evt StreamEvent if err := json.Unmarshal([]byte(l), &evt); err != nil { t.Fatalf("line %d: unmarshal: %v; got %q", i, err, l) } @@ -80,7 +82,7 @@ func TestNDJSONWriter_ConcurrentWrites(t *testing.T) { func TestNDJSONWriter_OmitsEmptyFields(t *testing.T) { var buf bytes.Buffer w := NewNDJSONWriter(&buf) - w.Write(NDJSONEvent{ + w.Write(StreamEvent{ Timestamp: "2026-04-17T10:00:00Z", Source: "llm", Type: "request_start", @@ -108,7 +110,7 @@ func TestNDJSONWriter_PipelineHandler(t *testing.T) { Message: "started", }) - var evt NDJSONEvent + var evt StreamEvent if err := json.Unmarshal([]byte(strings.TrimSpace(buf.String())), &evt); err != nil { t.Fatalf("invalid JSON: %v", err) } @@ -142,7 +144,7 @@ func TestNDJSONWriter_PipelineHandlerWithError(t *testing.T) { Err: errors.New("context cancelled"), }) - var evt NDJSONEvent + var evt StreamEvent if err := json.Unmarshal([]byte(strings.TrimSpace(buf.String())), &evt); err != nil { t.Fatalf("invalid JSON: %v", err) } @@ -163,7 +165,7 @@ func TestNDJSONWriter_TraceObserver(t *testing.T) { Preview: "hello world", }) - var evt NDJSONEvent + var evt StreamEvent if err := json.Unmarshal([]byte(strings.TrimSpace(buf.String())), &evt); err != nil { t.Fatalf("invalid JSON: %v", err) } @@ -193,7 +195,7 @@ func TestNDJSONWriter_TraceObserverWithToolName(t *testing.T) { ToolName: "execute_command", }) - var evt NDJSONEvent + var evt StreamEvent if err := json.Unmarshal([]byte(strings.TrimSpace(buf.String())), &evt); err != nil { t.Fatalf("invalid JSON: %v", err) } @@ -215,7 +217,7 @@ func TestNDJSONWriter_AgentHandler(t *testing.T) { ToolOutput: "file contents here", }) - var evt NDJSONEvent + var evt StreamEvent if err := json.Unmarshal([]byte(strings.TrimSpace(buf.String())), &evt); err != nil { t.Fatalf("invalid JSON: %v", err) } @@ -243,7 +245,7 @@ func TestNDJSONWriter_AgentHandlerFallsBackToText(t *testing.T) { Text: "thinking about the problem", }) - var evt NDJSONEvent + var evt StreamEvent if err := json.Unmarshal([]byte(strings.TrimSpace(buf.String())), &evt); err != nil { t.Fatalf("invalid JSON: %v", err) } @@ -263,7 +265,7 @@ func TestNDJSONWriter_AgentHandlerWithToolError(t *testing.T) { ToolError: "command timed out", }) - var evt NDJSONEvent + var evt StreamEvent if err := json.Unmarshal([]byte(strings.TrimSpace(buf.String())), &evt); err != nil { t.Fatalf("invalid JSON: %v", err) } @@ -284,7 +286,7 @@ func TestNDJSONWriter_AgentHandlerCombinesErrors(t *testing.T) { Err: errors.New("process killed"), }) - var evt NDJSONEvent + var evt StreamEvent if err := json.Unmarshal([]byte(strings.TrimSpace(buf.String())), &evt); err != nil { t.Fatalf("invalid JSON: %v", err) } @@ -300,7 +302,7 @@ func TestNDJSONWriter_AgentHandler_ContentPriority(t *testing.T) { h := w.AgentHandler() h.HandleEvent(agent.Event{Type: agent.EventToolCallEnd, ToolOutput: "tool-out", Text: "text-out"}) - var got NDJSONEvent + var got StreamEvent if err := json.Unmarshal(bytes.TrimRight(buf.Bytes(), "\n"), &got); err != nil { t.Fatalf("unmarshal: %v", err) } @@ -320,7 +322,7 @@ func TestNDJSONWriter_AgentHandlerErrorOnlyFromErr(t *testing.T) { Err: errors.New("session failed"), }) - var evt NDJSONEvent + var evt StreamEvent if err := json.Unmarshal([]byte(strings.TrimSpace(buf.String())), &evt); err != nil { t.Fatalf("invalid JSON: %v", err) } @@ -328,3 +330,133 @@ func TestNDJSONWriter_AgentHandlerErrorOnlyFromErr(t *testing.T) { t.Errorf("error = %q, want 'session failed'", evt.Error) } } + +// errWriter always returns an error on Write, used to exercise NDJSONWriter's +// error-propagation path. +type errWriter struct{ err error } + +func (e *errWriter) Write(_ []byte) (int, error) { return 0, e.err } + +// shortWriter returns n < len(p) with a nil error — a legal but rare +// io.Writer behavior that NDJSONWriter must treat as a stream error so +// silent truncation cannot sneak through. +type shortWriter struct{} + +func (shortWriter) Write(p []byte) (int, error) { + if len(p) == 0 { + return 0, nil + } + return len(p) - 1, nil // one byte short, nil error +} + +func TestNDJSONWriter_Write_ShortWriteIsError(t *testing.T) { + w := NewNDJSONWriter(shortWriter{}) + err := w.Write(StreamEvent{Timestamp: "t", Source: "pipeline", Type: "x"}) + if err != io.ErrShortWrite { + t.Errorf("err = %v, want io.ErrShortWrite", err) + } +} + +func TestNDJSONWriter_WriteReturnsError(t *testing.T) { + w := NewNDJSONWriter(&errWriter{err: errors.New("sink closed")}) + err := w.Write(StreamEvent{Timestamp: "t", Source: "pipeline", Type: "stage_started"}) + if err == nil { + t.Fatal("expected non-nil error when backing writer fails") + } + if !strings.Contains(err.Error(), "sink closed") { + t.Errorf("error = %q, want to contain 'sink closed'", err.Error()) + } +} + +// panicWriter panics on Write, used to exercise handler panic-recovery paths. +type panicWriter struct{} + +func (p *panicWriter) Write(_ []byte) (int, error) { panic("sink exploded") } + +func TestNDJSONWriter_PipelineHandler_PanicRecovery(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Fatalf("PipelineHandler should recover from panic, got: %v", r) + } + }() + w := NewNDJSONWriter(&panicWriter{}) + w.PipelineHandler().HandlePipelineEvent(pipeline.PipelineEvent{ + Type: pipeline.EventPipelineStarted, + Timestamp: time.Now(), + RunID: "r", + }) +} + +func TestNDJSONWriter_AgentHandler_PanicRecovery(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Fatalf("AgentHandler should recover from panic, got: %v", r) + } + }() + w := NewNDJSONWriter(&panicWriter{}) + w.AgentHandler().HandleEvent(agent.Event{Type: agent.EventTextDelta, Text: "x"}) +} + +func TestNDJSONWriter_TraceObserver_PanicRecovery(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Fatalf("TraceObserver should recover from panic, got: %v", r) + } + }() + w := NewNDJSONWriter(&panicWriter{}) + w.TraceObserver().HandleTraceEvent(llm.TraceEvent{Kind: llm.TraceRequestStart}) +} + +// TestNDJSONWriter_PanicSuppressionIsPerInstance verifies that one writer +// recovering from a panic does not silence panic logging on a separate +// writer instance. Both writers must independently emit their first +// panic line to os.Stderr; regressions here mean package-level state +// re-crept in (a package-level sync.Once would log once and silently +// swallow the second, which the assertion below catches). +func TestNDJSONWriter_PanicSuppressionIsPerInstance(t *testing.T) { + defer func() { + if r := recover(); r != nil { + t.Fatalf("handlers should recover, got: %v", r) + } + }() + + stderr := captureStderr(t, func() { + w1 := NewNDJSONWriter(&panicWriter{}) + w2 := NewNDJSONWriter(&panicWriter{}) + w1.PipelineHandler().HandlePipelineEvent(pipeline.PipelineEvent{ + Type: pipeline.EventPipelineStarted, Timestamp: time.Now(), + }) + w2.PipelineHandler().HandlePipelineEvent(pipeline.PipelineEvent{ + Type: pipeline.EventPipelineStarted, Timestamp: time.Now(), + }) + }) + + // Count the recovered-panic log lines. Each writer must emit its own. + // Package-level suppression would produce exactly 1; per-instance produces 2. + got := strings.Count(stderr, "NDJSON pipeline handler recovered from panic") + if got != 2 { + t.Errorf("got %d panic log lines, want 2 (each writer instance logs independently); stderr was:\n%s", got, stderr) + } +} + +// captureStderr swaps os.Stderr for an os.Pipe for the duration of fn, +// returning the captured output as a string. +func captureStderr(t *testing.T, fn func()) string { + t.Helper() + r, w, err := os.Pipe() + if err != nil { + t.Fatalf("os.Pipe: %v", err) + } + origStderr := os.Stderr + os.Stderr = w + t.Cleanup(func() { os.Stderr = origStderr }) + + fn() + w.Close() + + data, err := io.ReadAll(r) + if err != nil { + t.Fatalf("read captured stderr: %v", err) + } + return string(data) +} diff --git a/tracker_simulate.go b/tracker_simulate.go index e7960d8..11cce98 100644 --- a/tracker_simulate.go +++ b/tracker_simulate.go @@ -3,6 +3,7 @@ package tracker import ( + "context" "fmt" "sort" @@ -47,8 +48,22 @@ type PlanStep struct { Edges []SimEdge `json:"edges,omitempty"` } -// Simulate parses source and returns a SimulateReport. Format is detected from content. -func Simulate(source string) (*SimulateReport, error) { +// Simulate parses source and returns a SimulateReport. Format is detected +// from content. +// +// ctx is checked at entry so a caller that passes an already-cancelled +// context gets an immediate error instead of silent work. Full +// cancellation mid-parse would require threading ctx through +// parsePipelineSource → dippin-lang's parser, which is out of scope +// today (parses are fast and O(n) anyway). Nil is coalesced to +// context.Background(). +func Simulate(ctx context.Context, source string) (*SimulateReport, error) { + if ctx == nil { + ctx = context.Background() + } + if err := ctx.Err(); err != nil { + return nil, err + } format := detectSourceFormat(source) graph, err := parsePipelineSource(source, format) if err != nil { diff --git a/tracker_simulate_test.go b/tracker_simulate_test.go index c69cf91..7c54836 100644 --- a/tracker_simulate_test.go +++ b/tracker_simulate_test.go @@ -2,6 +2,7 @@ package tracker import ( + "context" "strings" "testing" ) @@ -24,7 +25,7 @@ const simpleSource = `workflow X ` func TestSimulate_BasicGraph(t *testing.T) { - r, err := Simulate(simpleSource) + r, err := Simulate(context.Background(), simpleSource) if err != nil { t.Fatalf("Simulate: %v", err) } @@ -57,7 +58,7 @@ func TestSimulate_UnreachableDetection(t *testing.T) { Start -> S; S -> E; }` - r, err := Simulate(src) + r, err := Simulate(context.Background(), src) if err != nil { t.Fatalf("Simulate: %v", err) } @@ -81,7 +82,7 @@ func TestSimulate_EdgeConditionPropagated(t *testing.T) { edges S -> E when ctx.outcome = success ` - r, err := Simulate(src) + r, err := Simulate(context.Background(), src) if err != nil { t.Fatalf("Simulate: %v", err) } @@ -94,12 +95,23 @@ func TestSimulate_EdgeConditionPropagated(t *testing.T) { } func TestSimulate_InvalidSource(t *testing.T) { - _, err := Simulate("this is not a pipeline") + _, err := Simulate(context.Background(), "this is not a pipeline") if err == nil { t.Fatal("expected error for invalid source") } } +// TestSimulate_CtxCancelledAtEntry verifies Simulate returns the caller's +// cancellation error immediately rather than silently parsing. +func TestSimulate_CtxCancelledAtEntry(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + _, err := Simulate(ctx, simpleSource) + if err != context.Canceled { + t.Errorf("err = %v, want context.Canceled", err) + } +} + func TestSimulate_GraphAttrsPopulated(t *testing.T) { // Use DOT format to set graph-level attributes reliably without // depending on dippin-lang's specific syntax for workflow-level attrs. @@ -109,7 +121,7 @@ func TestSimulate_GraphAttrsPopulated(t *testing.T) { End [shape=Msquare label="End"]; Start -> End; }` - r, err := Simulate(src) + r, err := Simulate(context.Background(), src) if err != nil { t.Fatalf("Simulate: %v", err) }