diff --git a/internal/agent/dreaming/training_data.go b/internal/agent/dreaming/training_data.go index 08822a6e..b18cc488 100644 --- a/internal/agent/dreaming/training_data.go +++ b/internal/agent/dreaming/training_data.go @@ -137,6 +137,20 @@ func (da *DreamingAgent) AssembleTrainingBatch(ctx context.Context, outputDir st totalWritten++ } + // Mark all assembled entries as used so they don't re-trigger training. + // This is critical: without it, CountUntrainedExperience never drops, + // and every dreaming cycle re-triggers training on the same data. + var usedEntryIDs []string + for _, entry := range goldEntries { + usedEntryIDs = append(usedEntryIDs, entry.ID) + } + for _, entry := range corrected { + usedEntryIDs = append(usedEntryIDs, entry.ID) + } + if err := da.store.MarkExperienceUsedInTraining(ctx, batchID, usedEntryIDs); err != nil { + da.log.Warn("failed to mark experience as used in training", "error", err, "count", len(usedEntryIDs)) + } + manifest := &TrainingBatchManifest{ ID: batchID, CreatedAt: time.Now(), diff --git a/internal/agent/dreaming/training_trigger.go b/internal/agent/dreaming/training_trigger.go index aedab5af..b78e098d 100644 --- a/internal/agent/dreaming/training_trigger.go +++ b/internal/agent/dreaming/training_trigger.go @@ -63,6 +63,19 @@ func TrainingRequestsDir() string { return filepath.Join(homeDir, ".mnemonic", "training_requests") } +// TrainingDisabledPath returns the path to the e-stop sentinel file. +// When this file exists, all auto and manual training is blocked. +func TrainingDisabledPath() string { + homeDir, _ := os.UserHomeDir() + return filepath.Join(homeDir, ".mnemonic", "training.disabled") +} + +// isTrainingDisabled checks for the e-stop sentinel file. +func isTrainingDisabled() bool { + _, err := os.Stat(TrainingDisabledPath()) + return err == nil +} + // trainingCheck runs Phase 4.85: check if we should trigger spoke training. // Only runs during dreaming if auto-trigger is enabled. Also callable via MCP. func (da *DreamingAgent) trainingCheck(ctx context.Context, clCfg config.ContinuousLearningConfig) (*TrainingResult, error) { @@ -74,12 +87,53 @@ func (da *DreamingAgent) trainingCheck(ctx context.Context, clCfg config.Continu return nil, nil } + // E-stop: check for sentinel file + if isTrainingDisabled() { + da.log.Warn("training disabled by e-stop file", "path", TrainingDisabledPath()) + return nil, nil + } + // Check training window if clCfg.Trigger.TrainingWindow != "" && !inTrainingWindow(clCfg.Trigger.TrainingWindow) { da.log.Debug("outside training window, skipping", "window", clCfg.Trigger.TrainingWindow) return nil, nil } + // Circuit breaker: stop after too many consecutive failures + maxFailures := clCfg.Trigger.MaxConsecutiveFailures + if maxFailures <= 0 { + maxFailures = 3 + } + consecutiveFailures, err := da.store.CountConsecutiveFailedTrainingRuns(ctx) + if err != nil { + da.log.Warn("failed to check consecutive training failures", "error", err) + } else if consecutiveFailures >= maxFailures { + da.log.Warn("training circuit breaker open: too many consecutive failures", + "consecutive_failures", consecutiveFailures, "max", maxFailures) + return nil, nil + } + + // Cooldown: don't re-trigger too soon after a failed run + cooldownHours := clCfg.Trigger.FailureCooldownHours + if cooldownHours <= 0 { + cooldownHours = 24 + } + if consecutiveFailures > 0 { + lastEnd, endErr := da.store.GetLastTrainingRunEndTime(ctx) + if endErr != nil { + da.log.Warn("failed to check last training run time", "error", endErr) + } else if !lastEnd.IsZero() { + cooldown := time.Duration(cooldownHours) * time.Hour + if time.Since(lastEnd) < cooldown { + da.log.Info("training skipped: cooling down after failure", + "last_run_ended", lastEnd.Format(time.RFC3339), + "cooldown_hours", cooldownHours, + "consecutive_failures", consecutiveFailures) + return nil, nil + } + } + } + return da.RunTrainingCycle(ctx, clCfg, "auto") } diff --git a/internal/agent/dreaming/training_trigger_test.go b/internal/agent/dreaming/training_trigger_test.go index a271c0b2..537156bb 100644 --- a/internal/agent/dreaming/training_trigger_test.go +++ b/internal/agent/dreaming/training_trigger_test.go @@ -18,13 +18,16 @@ import ( // triggerMockStore provides controlled responses for training trigger tests. type triggerMockStore struct { storetest.MockStore - untrainedCount int - goldEntries []store.ExperienceEntry - needsImpEntries []store.ExperienceEntry - rawMemories map[string]store.RawMemory - memories map[string]store.Memory - trainingRunsW []store.TrainingRun - trainingRunsU []store.TrainingRun + untrainedCount int + goldEntries []store.ExperienceEntry + needsImpEntries []store.ExperienceEntry + rawMemories map[string]store.RawMemory + memories map[string]store.Memory + trainingRunsW []store.TrainingRun + trainingRunsU []store.TrainingRun + consecutiveFailures int + lastTrainingRunEndTime time.Time + markedUsedEntryIDs []string } func (m *triggerMockStore) CountUntrainedExperience(_ context.Context) (int, error) { @@ -73,6 +76,19 @@ func (m *triggerMockStore) UpdateTrainingRun(_ context.Context, run store.Traini return nil } +func (m *triggerMockStore) CountConsecutiveFailedTrainingRuns(_ context.Context) (int, error) { + return m.consecutiveFailures, nil +} + +func (m *triggerMockStore) GetLastTrainingRunEndTime(_ context.Context) (time.Time, error) { + return m.lastTrainingRunEndTime, nil +} + +func (m *triggerMockStore) MarkExperienceUsedInTraining(_ context.Context, _ string, entryIDs []string) error { + m.markedUsedEntryIDs = append(m.markedUsedEntryIDs, entryIDs...) + return nil +} + func baseCLConfig() config.ContinuousLearningConfig { return config.ContinuousLearningConfig{ Enabled: true, @@ -340,6 +356,136 @@ func TestPickUpTrainingResult_FailedRun(t *testing.T) { } } +func TestTrainingCheck_CircuitBreakerBlocks(t *testing.T) { + ms := &triggerMockStore{ + untrainedCount: 100, + consecutiveFailures: 3, + } + agent := NewDreamingAgent(ms, nil, DreamingConfig{Interval: time.Hour}, slog.New(slog.NewTextHandler(io.Discard, nil))) + + clCfg := baseCLConfig() + clCfg.Trigger.MaxConsecutiveFailures = 3 + + result, err := agent.trainingCheck(context.Background(), clCfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result != nil { + t.Fatal("expected nil result when circuit breaker is open") + } +} + +func TestTrainingCheck_CooldownBlocks(t *testing.T) { + ms := &triggerMockStore{ + untrainedCount: 100, + consecutiveFailures: 1, + lastTrainingRunEndTime: time.Now().Add(-30 * time.Minute), // 30 min ago + } + agent := NewDreamingAgent(ms, nil, DreamingConfig{Interval: time.Hour}, slog.New(slog.NewTextHandler(io.Discard, nil))) + + clCfg := baseCLConfig() + clCfg.Trigger.FailureCooldownHours = 24 + + result, err := agent.trainingCheck(context.Background(), clCfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result != nil { + t.Fatal("expected nil result during cooldown period") + } +} + +func TestTrainingCheck_AllowsAfterCooldown(t *testing.T) { + tmpDir := t.TempDir() + t.Setenv("MNEMONIC_TRAINING_REQUESTS_DIR", tmpDir) + + ms := &triggerMockStore{ + untrainedCount: 100, + consecutiveFailures: 1, + lastTrainingRunEndTime: time.Now().Add(-25 * time.Hour), // 25h ago + goldEntries: []store.ExperienceEntry{ + {ID: "e1", RawID: "raw-1", MemoryID: "mem-1", EncodingEPR: 0.95, Category: "gold"}, + }, + rawMemories: map[string]store.RawMemory{ + "raw-1": {ID: "raw-1", Content: "Test", Source: "mcp", Type: "general"}, + }, + memories: map[string]store.Memory{ + "mem-1": {ID: "mem-1", Summary: "test", Content: "test", Concepts: []string{"test"}, Salience: 0.5}, + }, + } + agent := NewDreamingAgent(ms, nil, DreamingConfig{Interval: time.Hour}, slog.New(slog.NewTextHandler(io.Discard, nil))) + + clCfg := baseCLConfig() + clCfg.Trigger.FailureCooldownHours = 24 + + result, err := agent.trainingCheck(context.Background(), clCfg) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result == nil { + t.Fatal("expected training to proceed after cooldown expires") + } + if result.Status != "training_requested" { + t.Errorf("expected status 'training_requested', got %q", result.Status) + } +} + +func TestTrainingCheck_EStopBlocks(t *testing.T) { + tmpDir := t.TempDir() + estopPath := filepath.Join(tmpDir, ".mnemonic", "training.disabled") + if err := os.MkdirAll(filepath.Dir(estopPath), 0o755); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(estopPath, []byte("stopped"), 0o644); err != nil { + t.Fatal(err) + } + // Override HOME so isTrainingDisabled() finds the sentinel + t.Setenv("HOME", tmpDir) + + ms := &triggerMockStore{untrainedCount: 100} + agent := NewDreamingAgent(ms, nil, DreamingConfig{Interval: time.Hour}, slog.New(slog.NewTextHandler(io.Discard, nil))) + + result, err := agent.trainingCheck(context.Background(), baseCLConfig()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if result != nil { + t.Fatal("expected nil result when e-stop file exists") + } +} + +func TestAssembleTrainingBatch_MarksExperienceAsUsed(t *testing.T) { + tmpDir := t.TempDir() + + ms := &triggerMockStore{ + untrainedCount: 10, + goldEntries: []store.ExperienceEntry{ + {ID: "e1", RawID: "raw-1", MemoryID: "mem-1", EncodingEPR: 0.95, Category: "gold"}, + {ID: "e2", RawID: "raw-2", MemoryID: "mem-2", EncodingEPR: 0.90, Category: "gold"}, + }, + rawMemories: map[string]store.RawMemory{ + "raw-1": {ID: "raw-1", Content: "First event", Source: "mcp", Type: "general"}, + "raw-2": {ID: "raw-2", Content: "Second event", Source: "mcp", Type: "general"}, + }, + memories: map[string]store.Memory{ + "mem-1": {ID: "mem-1", Summary: "first", Content: "first content", Concepts: []string{"test"}, Salience: 0.5}, + "mem-2": {ID: "mem-2", Summary: "second", Content: "second content", Concepts: []string{"test"}, Salience: 0.5}, + }, + } + + agent := NewDreamingAgent(ms, nil, DreamingConfig{Interval: time.Hour}, slog.New(slog.NewTextHandler(io.Discard, nil))) + + _, err := agent.AssembleTrainingBatch(context.Background(), tmpDir, 50) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // Both gold entries should be marked as used + if len(ms.markedUsedEntryIDs) != 2 { + t.Fatalf("expected 2 entries marked as used, got %d", len(ms.markedUsedEntryIDs)) + } +} + func TestInTrainingWindow(t *testing.T) { tests := []struct { name string diff --git a/internal/agent/episoding/agent.go b/internal/agent/episoding/agent.go index a24d2a38..7604a66b 100644 --- a/internal/agent/episoding/agent.go +++ b/internal/agent/episoding/agent.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "log/slog" + "strings" "sync" "time" @@ -364,82 +365,69 @@ func (ea *EpisodingAgent) closeEpisode(ctx context.Context, ep *store.Episode) e return nil } - // Build LLM prompt for episode synthesis - eventsStr := "" - for _, t := range eventTexts { - eventsStr += t + "\n\n" - } - - // Detect if episode contains MCP-source events (Claude Code interaction) - hasMCPEvents := false - for _, rawID := range ep.RawMemoryIDs { - raw, err := ea.store.GetRaw(ctx, rawID) - if err != nil { - continue - } - if raw.Source == "mcp" { - hasMCPEvents = true - break - } - } + // Truncate events to fit context budget (keep first + last, fill middle) + eventsStr := truncateEventsForPrompt(eventTexts, maxEventChars) - var prompt string - if hasMCPEvents { - // Claude-aware prompt: emphasize the collaborative creative journey - prompt = fmt.Sprintf(`You're looking at a chapter from a creative collaboration — a human and AI building something together. What's the story of this session? + // Build directive prompt — TASK/RULES/SCHEMA format (no placeholder values) + prompt := fmt.Sprintf(`TASK: Summarize these events into a single episode. -Look for the arc: What problem were they trying to solve? What did they decide to do? Did they hit any walls, and how did they get past them? What did they actually create or change? What's the most interesting thing that happened? +RULES: +- FAITHFULNESS: Every fact must come from the events. Do not infer or speculate. +- PRESERVATION: Copy file paths, function names, and technical terms VERBATIM. +- MINIMALITY: Keep the title under 60 characters. Keep the summary under 120 characters. -Events: -%s - -Respond with ONLY a JSON object (no prose, no fences): -{"title":"a vivid, specific title for this session","summary":"1-2 sentences — the outcome and why it matters","narrative":"the story of what unfolded — decisions, breakthroughs, struggles, and what was learned","emotional_tone":"neutral|satisfying|frustrating|exciting|concerning","outcome":"success|failure|ongoing|unknown","concepts":["keyword1","keyword2"],"salience":0.7}`, eventsStr) - } else { - prompt = fmt.Sprintf(`You're looking at a stream of activity — moments from someone's work. What's the thread that connects them? What was this person doing, and what's worth remembering about it? +SCHEMA: {title, summary, concepts, salience} +- title: short name for what happened in this session +- summary: one sentence describing the outcome +- concepts: 2-5 keywords from the events (file names, tools, topics) +- salience: 0.3 for routine, 0.5 for normal, 0.7 for important, 0.9 for critical -Events: -%s +Output ONLY the JSON object. No explanation. -Respond with ONLY a JSON object (no prose, no fences): -{"title":"a clear, specific title","summary":"1-2 sentences capturing what happened","narrative":"the story — what was this person working on, what did they accomplish, what's interesting about it","emotional_tone":"neutral|satisfying|frustrating|exciting|concerning","outcome":"success|failure|ongoing|unknown","concepts":["keyword1","keyword2"],"salience":0.5}`, eventsStr) - } +EVENTS: +%s`, eventsStr) + usedLLM := false resp, err := ea.llmProvider.Complete(ctx, llm.CompletionRequest{ Messages: []llm.Message{ - {Role: "system", Content: "You are an episode synthesizer. Summarize groups of events into coherent episodes. Output JSON only."}, + {Role: "system", Content: "You are an episode summarizer. Output JSON only. Never explain, never apologize, never chat. Just fill in the JSON fields based on the events."}, {Role: "user", Content: prompt}, }, - MaxTokens: 1024, - Temperature: 0.3, + MaxTokens: 256, + Temperature: 0.2, ResponseFormat: &llm.ResponseFormat{ Type: "json_schema", JSONSchema: &llm.JSONSchema{ Name: "episode_synthesis", Strict: true, - Schema: json.RawMessage(`{"type":"object","properties":{"title":{"type":"string"},"summary":{"type":"string"},"narrative":{"type":"string"},"emotional_tone":{"type":"string"},"outcome":{"type":"string"},"concepts":{"type":"array","items":{"type":"string"}},"salience":{"type":"number"}},"required":["title","summary","narrative","emotional_tone","outcome","concepts","salience"],"additionalProperties":false}`), + Schema: json.RawMessage(`{"type":"object","properties":{"title":{"type":"string"},"summary":{"type":"string"},"concepts":{"type":"array","items":{"type":"string"}},"salience":{"type":"number"}},"required":["title","summary","concepts","salience"],"additionalProperties":false}`), }, }, }) if err != nil { - ea.log.Warn("LLM episode synthesis failed, using fallback", "error", err) - ep.Title = fmt.Sprintf("Session with %d events", len(ep.RawMemoryIDs)) - ep.Summary = ep.Title - ep.Salience = ea.defaultSalience() - ep.Concepts = []string{} + ea.log.Warn("LLM episode synthesis failed, using heuristic fallback", "error", err) + } else if resp.MeanProb > 0 && resp.MeanProb < 0.10 { + ea.log.Warn("LLM episode synthesis confidence too low, using heuristic fallback", + "mean_prob", resp.MeanProb, "min_prob", resp.MinProb) } else { - // Parse LLM response parsed := parseEpisodeSynthesis(resp.Content) ep.Title = parsed.Title ep.Summary = parsed.Summary - ep.Narrative = parsed.Narrative - ep.EmotionalTone = parsed.EmotionalTone - ep.Outcome = parsed.Outcome ep.Concepts = parsed.Concepts ep.Salience = parsed.Salience + usedLLM = true } + if !usedLLM { + // Heuristic fallback: derive from events directly + ep.Title, ep.Summary, ep.Concepts = heuristicEpisodeSynthesis(ep, timeline) + ep.Salience = ea.defaultSalience() + } + + // Enrich fields that the LLM doesn't produce (heuristic for all episodes) + enrichEpisodeFromEvents(ep, timeline) + ep.State = store.EpisodeStateClosed ep.UpdatedAt = time.Now() @@ -452,16 +440,14 @@ Respond with ONLY a JSON object (no prose, no fences): } // Backfill episode_id on encoded memories that came from this episode's raw observations. - // The encoding agent runs faster than episoding, so memories are often encoded before - // they're assigned to an episode. This patches up the linkage after the fact. linkedCount := 0 for _, rawID := range ep.RawMemoryIDs { mem, err := ea.store.GetMemoryByRawID(ctx, rawID) if err != nil { - continue // not encoded yet, or encoding failed + continue } if mem.EpisodeID == ep.ID { - continue // already linked + continue } mem.EpisodeID = ep.ID if err := ea.store.UpdateMemory(ctx, mem); err != nil { @@ -474,7 +460,7 @@ Respond with ONLY a JSON object (no prose, no fences): ea.log.Info("backfilled episode_id on encoded memories", "episode_id", ep.ID, "linked", linkedCount) } - // Also populate episode.MemoryIDs for the reverse link + // Populate episode.MemoryIDs for the reverse link var memIDs []string for _, rawID := range ep.RawMemoryIDs { mem, err := ea.store.GetMemoryByRawID(ctx, rawID) @@ -488,7 +474,6 @@ Respond with ONLY a JSON object (no prose, no fences): _ = ea.store.UpdateEpisode(ctx, *ep) } - // Publish event if ea.bus != nil { _ = ea.bus.Publish(ctx, events.EpisodeClosed{ EpisodeID: ep.ID, @@ -507,20 +492,23 @@ Respond with ONLY a JSON object (no prose, no fences): "tone", ep.EmotionalTone, "concepts", len(ep.Concepts), "files", len(ep.FilesModified), - "claude_session", hasMCPEvents, + "llm", usedLLM, ) return nil } -// episodeSynthesis is the LLM response structure. +// maxEventChars is the character budget for event text in the prompt. +// With 2048 context, 256 max_tokens, and ~200 tokens for boilerplate, +// that leaves ~1592 tokens (~6300 chars) for events. +const maxEventChars = 6000 + +// episodeSynthesis is the simplified 4-field LLM response structure. +// emotional_tone, outcome, and narrative are populated heuristically. type episodeSynthesis struct { - Title string `json:"title"` - Summary string `json:"summary"` - Narrative string `json:"narrative"` - EmotionalTone string `json:"emotional_tone"` - Outcome string `json:"outcome"` - Concepts []string `json:"concepts"` - Salience float32 `json:"salience"` + Title string `json:"title"` + Summary string `json:"summary"` + Concepts []string `json:"concepts"` + Salience float32 `json:"salience"` } // parseEpisodeSynthesis extracts JSON from LLM response. @@ -535,36 +523,145 @@ func parseEpisodeSynthesis(response string) episodeSynthesis { "response_preview", agentutil.Truncate(response, 200), ) return episodeSynthesis{ - Title: "Untitled session", - Summary: "Episode synthesis failed — LLM returned unparseable response.", - Salience: 0.5, - EmotionalTone: "neutral", - Outcome: "ongoing", - Concepts: []string{}, + Title: "Untitled session", + Summary: "Episode synthesis failed — LLM returned unparseable response.", + Salience: 0.5, + Concepts: []string{}, } } - // Guard against the LLM returning code or garbage in fields if len(result.Summary) > 500 { result.Summary = result.Summary[:500] + "..." } - if len(result.Narrative) > 2000 { - result.Narrative = result.Narrative[:2000] + "..." - } - // Validate fields if result.Title == "" { result.Title = "Untitled session" } if result.Salience <= 0 { result.Salience = 0.5 } - if result.EmotionalTone == "" { - result.EmotionalTone = "neutral" - } - if result.Outcome == "" { - result.Outcome = "ongoing" - } if result.Concepts == nil { result.Concepts = []string{} } return result } + +// truncateEventsForPrompt keeps first + last events (bookends) and fills +// middle events until the character budget is exhausted. +func truncateEventsForPrompt(events []string, maxChars int) string { + if len(events) == 0 { + return "" + } + if len(events) == 1 { + return agentutil.Truncate(events[0], maxChars) + } + + // Always include first and last + first := events[0] + last := events[len(events)-1] + used := len(first) + len(last) + 4 // 4 for newlines + + var middle []string + for i := 1; i < len(events)-1; i++ { + if used+len(events[i])+2 > maxChars { + break + } + middle = append(middle, events[i]) + used += len(events[i]) + 2 + } + + var b strings.Builder + b.WriteString(first) + b.WriteString("\n\n") + for _, m := range middle { + b.WriteString(m) + b.WriteString("\n\n") + } + b.WriteString(last) + return b.String() +} + +// heuristicEpisodeSynthesis builds title, summary, and concepts from event data +// without LLM assistance. Used as fallback when LLM fails or has low confidence. +func heuristicEpisodeSynthesis(ep *store.Episode, timeline []store.EventEntry) (title, summary string, concepts []string) { + project := ep.Project + if project == "" { + project = "Activity" + } + title = fmt.Sprintf("%s: %d events", project, len(ep.RawMemoryIDs)) + + if len(timeline) > 0 { + summary = timeline[0].Brief + if len(timeline) > 1 { + summary += " ... " + timeline[len(timeline)-1].Brief + } + } else { + summary = title + } + + // Extract concepts from file paths and event types + seen := make(map[string]bool) + for _, entry := range timeline { + if entry.FilePath != "" && !seen[entry.FilePath] { + seen[entry.FilePath] = true + concepts = append(concepts, entry.FilePath) + } + if entry.Type != "" && !seen[entry.Type] { + seen[entry.Type] = true + concepts = append(concepts, entry.Type) + } + if len(concepts) >= 5 { + break + } + } + if concepts == nil { + concepts = []string{} + } + return title, summary, concepts +} + +// enrichEpisodeFromEvents populates emotional_tone, outcome, and narrative +// heuristically from event data. Called for all episodes (both LLM and fallback). +func enrichEpisodeFromEvents(ep *store.Episode, timeline []store.EventEntry) { + // Build narrative from timeline briefs + if ep.Narrative == "" && len(timeline) > 0 { + var b strings.Builder + for i, entry := range timeline { + if i > 0 { + b.WriteString(" ") + } + b.WriteString(entry.Brief) + if i < len(timeline)-1 { + b.WriteString(".") + } + } + ep.Narrative = agentutil.Truncate(b.String(), 2000) + } + + // Detect emotional tone from event content + if ep.EmotionalTone == "" { + tone := "neutral" + for _, entry := range timeline { + lower := strings.ToLower(entry.Brief) + if strings.Contains(lower, "error") || strings.Contains(lower, "fail") || strings.Contains(lower, "panic") || strings.Contains(lower, "crash") { + tone = "frustrating" + break + } + if strings.Contains(lower, "success") || strings.Contains(lower, "done") || strings.Contains(lower, "complete") || strings.Contains(lower, "fixed") { + tone = "satisfying" + } + } + ep.EmotionalTone = tone + } + + // Detect outcome from last events + if ep.Outcome == "" { + ep.Outcome = "ongoing" + if len(timeline) > 0 { + last := strings.ToLower(timeline[len(timeline)-1].Brief) + if strings.Contains(last, "error") || strings.Contains(last, "fail") { + ep.Outcome = "failure" + } else if strings.Contains(last, "success") || strings.Contains(last, "done") || strings.Contains(last, "complete") { + ep.Outcome = "success" + } + } + } +} diff --git a/internal/agent/episoding/agent_test.go b/internal/agent/episoding/agent_test.go index 0947dcfb..ff7a40a1 100644 --- a/internal/agent/episoding/agent_test.go +++ b/internal/agent/episoding/agent_test.go @@ -2,10 +2,12 @@ package episoding import ( "testing" + + "github.com/appsprout-dev/mnemonic/internal/store" ) func TestParseEpisodeSynthesis_ValidJSON(t *testing.T) { - input := `{"title":"Debugging session","summary":"Fixed episode parsing","narrative":"Investigated and resolved JSON parse failures","emotional_tone":"satisfying","outcome":"success","concepts":["debugging","json","parsing"],"salience":0.8}` + input := `{"title":"Debugging session","summary":"Fixed episode parsing","concepts":["debugging","json","parsing"],"salience":0.8}` result := parseEpisodeSynthesis(input) @@ -34,21 +36,8 @@ func TestParseEpisodeSynthesis_EmptyResponse(t *testing.T) { } } -func TestParseEpisodeSynthesis_TypeMismatch(t *testing.T) { - // This is what the embedded model produces without a dedicated grammar: - // salience as string instead of number - input := `{"title":"Test","summary":"Test","narrative":"Test","emotional_tone":"neutral","outcome":"ongoing","concepts":"keyword","salience":"high"}` - - result := parseEpisodeSynthesis(input) - - // Should fall back to defaults since unmarshal fails on type mismatch - if result.Title != "Untitled session" { - t.Errorf("expected fallback title, got %q", result.Title) - } -} - func TestParseEpisodeSynthesis_MarkdownFenced(t *testing.T) { - input := "```json\n{\"title\":\"Test\",\"summary\":\"s\",\"narrative\":\"n\",\"emotional_tone\":\"neutral\",\"outcome\":\"success\",\"concepts\":[],\"salience\":0.5}\n```" + input := "```json\n{\"title\":\"Test\",\"summary\":\"s\",\"concepts\":[],\"salience\":0.5}\n```" result := parseEpisodeSynthesis(input) @@ -58,7 +47,6 @@ func TestParseEpisodeSynthesis_MarkdownFenced(t *testing.T) { } func TestParseEpisodeSynthesis_MissingFields(t *testing.T) { - // Valid JSON but missing fields — unmarshal succeeds, validation fills defaults input := `{"title":"","summary":"test"}` result := parseEpisodeSynthesis(input) @@ -70,3 +58,157 @@ func TestParseEpisodeSynthesis_MissingFields(t *testing.T) { t.Errorf("salience = %f, want %f", result.Salience, 0.5) } } + +func TestTruncateEventsForPrompt_AllFit(t *testing.T) { + events := []string{"event 1", "event 2", "event 3"} + result := truncateEventsForPrompt(events, 1000) + if result == "" { + t.Error("expected non-empty result") + } + // All 3 events should be present + for _, e := range events { + if !contains(result, e) { + t.Errorf("expected %q in result", e) + } + } +} + +func TestTruncateEventsForPrompt_BookendStrategy(t *testing.T) { + events := []string{"FIRST", "middle1", "middle2", "LAST"} + // Budget that only fits first + last + maybe 1 middle + result := truncateEventsForPrompt(events, 30) + if !contains(result, "FIRST") { + t.Error("expected FIRST in result") + } + if !contains(result, "LAST") { + t.Error("expected LAST in result") + } +} + +func TestTruncateEventsForPrompt_SingleEvent(t *testing.T) { + result := truncateEventsForPrompt([]string{"only one"}, 100) + if result != "only one" { + t.Errorf("expected 'only one', got %q", result) + } +} + +func TestTruncateEventsForPrompt_Empty(t *testing.T) { + result := truncateEventsForPrompt(nil, 100) + if result != "" { + t.Errorf("expected empty, got %q", result) + } +} + +func TestHeuristicEpisodeSynthesis(t *testing.T) { + ep := &store.Episode{ + Project: "mnemonic", + RawMemoryIDs: []string{"a", "b", "c"}, + } + timeline := []store.EventEntry{ + {Brief: "Started debugging", Type: "mcp", FilePath: "internal/agent/foo.go"}, + {Brief: "Fixed the bug", Type: "filesystem", FilePath: "internal/agent/bar.go"}, + } + + title, summary, concepts := heuristicEpisodeSynthesis(ep, timeline) + + if title != "mnemonic: 3 events" { + t.Errorf("title = %q", title) + } + if summary == "" { + t.Error("expected non-empty summary") + } + if len(concepts) == 0 { + t.Error("expected non-empty concepts") + } +} + +func TestEnrichEpisodeFromEvents_DetectsErrorTone(t *testing.T) { + ep := &store.Episode{} + timeline := []store.EventEntry{ + {Brief: "Working on feature"}, + {Brief: "ERROR: compilation failed"}, + } + + enrichEpisodeFromEvents(ep, timeline) + + if ep.EmotionalTone != "frustrating" { + t.Errorf("tone = %q, want 'frustrating'", ep.EmotionalTone) + } + if ep.Outcome != "failure" { + t.Errorf("outcome = %q, want 'failure'", ep.Outcome) + } + if ep.Narrative == "" { + t.Error("expected non-empty narrative") + } +} + +func TestEnrichEpisodeFromEvents_DetectsSuccessTone(t *testing.T) { + ep := &store.Episode{} + timeline := []store.EventEntry{ + {Brief: "Implemented feature"}, + {Brief: "Tests complete and passing"}, + } + + enrichEpisodeFromEvents(ep, timeline) + + if ep.EmotionalTone != "satisfying" { + t.Errorf("tone = %q, want 'satisfying'", ep.EmotionalTone) + } + if ep.Outcome != "success" { + t.Errorf("outcome = %q, want 'success'", ep.Outcome) + } +} + +func TestEnrichEpisodeFromEvents_DefaultsNeutral(t *testing.T) { + ep := &store.Episode{} + timeline := []store.EventEntry{ + {Brief: "Editing config file"}, + {Brief: "Updated dependency"}, + } + + enrichEpisodeFromEvents(ep, timeline) + + if ep.EmotionalTone != "neutral" { + t.Errorf("tone = %q, want 'neutral'", ep.EmotionalTone) + } + if ep.Outcome != "ongoing" { + t.Errorf("outcome = %q, want 'ongoing'", ep.Outcome) + } +} + +func TestEnrichEpisodeFromEvents_DoesNotOverwrite(t *testing.T) { + ep := &store.Episode{ + EmotionalTone: "exciting", + Outcome: "success", + Narrative: "Already set", + } + timeline := []store.EventEntry{ + {Brief: "ERROR crash"}, + } + + enrichEpisodeFromEvents(ep, timeline) + + // Should not overwrite existing values + if ep.EmotionalTone != "exciting" { + t.Errorf("tone should not be overwritten, got %q", ep.EmotionalTone) + } + if ep.Outcome != "success" { + t.Errorf("outcome should not be overwritten, got %q", ep.Outcome) + } + if ep.Narrative != "Already set" { + t.Errorf("narrative should not be overwritten, got %q", ep.Narrative) + } +} + +func contains(s, substr string) bool { + return len(s) >= len(substr) && (s == substr || len(s) > 0 && containsSubstr(s, substr)) +} + +func containsSubstr(s, sub string) bool { + for i := 0; i <= len(s)-len(sub); i++ { + if s[i:i+len(sub)] == sub { + return true + } + } + return false +} diff --git a/internal/config/config.go b/internal/config/config.go index 630d0f2f..678833d2 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -505,9 +505,11 @@ type CLTrainingConfig struct { // CLTriggerConfig holds trigger settings for continuous learning. type CLTriggerConfig struct { - Auto bool `yaml:"auto"` // metacognition auto-trigger (default: false) - Manual bool `yaml:"manual"` // MCP tool trigger (default: true) - TrainingWindow string `yaml:"training_window"` // auto-trigger window, e.g. "02:00-06:00" + Auto bool `yaml:"auto"` // metacognition auto-trigger (default: false) + Manual bool `yaml:"manual"` // MCP tool trigger (default: true) + TrainingWindow string `yaml:"training_window"` // auto-trigger window, e.g. "02:00-06:00" + FailureCooldownHours int `yaml:"failure_cooldown_hours"` // hours to wait after a failed run (default: 24) + MaxConsecutiveFailures int `yaml:"max_consecutive_failures"` // circuit breaker: disable after N failures (default: 3) } // LoggingConfig holds logging settings. @@ -896,9 +898,11 @@ func Default() *Config { CooldownHours: 24, }, Trigger: CLTriggerConfig{ - Auto: true, - Manual: true, - TrainingWindow: "02:00-06:00", + Auto: true, + Manual: true, + TrainingWindow: "02:00-06:00", + FailureCooldownHours: 24, + MaxConsecutiveFailures: 3, }, }, AgentSDK: AgentSDKConfig{ diff --git a/internal/llm/grammar.go b/internal/llm/grammar.go index 81514e97..c1ffdb3d 100644 --- a/internal/llm/grammar.go +++ b/internal/llm/grammar.go @@ -57,20 +57,18 @@ number ::= "-"? ("0" | [1-9] [0-9]*) ("." [0-9]+)? ([eE] [-+]? [0-9]+)? ws ::= ([ \t\n] ws)? ` -// GBNFEpisodeSynthesis constrains output to the episode synthesis schema. -// Fixed key order and typed values prevent the embedded model from producing -// type mismatches (e.g. salience as string) that break json.Unmarshal. -const GBNFEpisodeSynthesis = `root ::= "{" ws title-kv "," ws summary-kv "," ws narrative-kv "," ws emotional-tone-kv "," ws outcome-kv "," ws concepts-kv "," ws salience-kv ws "}" - -title-kv ::= "\"title\"" ws ":" ws string -summary-kv ::= "\"summary\"" ws ":" ws string -narrative-kv ::= "\"narrative\"" ws ":" ws string -emotional-tone-kv ::= "\"emotional_tone\"" ws ":" ws string -outcome-kv ::= "\"outcome\"" ws ":" ws string -concepts-kv ::= "\"concepts\"" ws ":" ws string-array -salience-kv ::= "\"salience\"" ws ":" ws number +// GBNFEpisodeSynthesis constrains output to the simplified 4-field episode schema. +// Fixed key order and typed values prevent type mismatches. Salience is constrained +// to 0.X or 0.XX format to prevent string values like "high". +const GBNFEpisodeSynthesis = `root ::= "{" ws title-kv "," ws summary-kv "," ws concepts-kv "," ws salience-kv ws "}" + +title-kv ::= "\"title\"" ws ":" ws string +summary-kv ::= "\"summary\"" ws ":" ws string +concepts-kv ::= "\"concepts\"" ws ":" ws string-array +salience-kv ::= "\"salience\"" ws ":" ws salience-val string-array ::= "[" ws "]" | "[" ws string ("," ws string)* ws "]" +salience-val ::= "0." [0-9] [0-9]? string ::= "\"" ( @@ -78,8 +76,6 @@ string ::= "\\" (["\\/bfnrt] | "u" [0-9a-fA-F] [0-9a-fA-F] [0-9a-fA-F] [0-9a-fA-F]) )* "\"" -number ::= "-"? ("0" | [1-9] [0-9]*) ("." [0-9]+)? ([eE] [-+]? [0-9]+)? - ws ::= ([ \t\n] ws)? ` diff --git a/internal/mcp/server.go b/internal/mcp/server.go index 92debb7b..3b44b59a 100644 --- a/internal/mcp/server.go +++ b/internal/mcp/server.go @@ -7,6 +7,7 @@ import ( "fmt" "log/slog" "os" + "path/filepath" "sort" "strings" "time" @@ -2901,6 +2902,13 @@ func (srv *MCPServer) handleTrainModel(ctx context.Context, _ map[string]any) (a return nil, fmt.Errorf("training not available — daemon must be running with dreaming agent enabled") } + // Check e-stop file + homeDir, _ := os.UserHomeDir() + estopPath := filepath.Join(homeDir, ".mnemonic", "training.disabled") + if _, statErr := os.Stat(estopPath); statErr == nil { + return toolResult("Training disabled — e-stop file exists at " + estopPath + ". Remove it to re-enable training."), nil + } + result, err := srv.trainingTriggerFn(ctx) if err != nil { return nil, fmt.Errorf("training cycle failed: %w", err) diff --git a/internal/store/sqlite/continuous_learning.go b/internal/store/sqlite/continuous_learning.go index 1eaca1e1..48a203c8 100644 --- a/internal/store/sqlite/continuous_learning.go +++ b/internal/store/sqlite/continuous_learning.go @@ -400,6 +400,51 @@ func (s *SQLiteStore) GetLastTrainingRunTime(ctx context.Context) (time.Time, er return t, nil } +func (s *SQLiteStore) CountConsecutiveFailedTrainingRuns(ctx context.Context) (int, error) { + var count int + // Count failed or stale "requested" runs since the last successful one. + // Stale "requested" = daemon crashed mid-training, result never written. + // Grace period: don't count runs started within 10 minutes (may be in-progress). + err := s.db.QueryRowContext(ctx, + `SELECT COUNT(*) FROM training_runs + WHERE status IN ('failed', 'requested') + AND started_at > COALESCE( + (SELECT MAX(started_at) FROM training_runs WHERE status = 'completed'), + '1970-01-01T00:00:00Z' + ) + AND started_at < datetime('now', '-10 minutes')`, + ).Scan(&count) + if err != nil { + return 0, fmt.Errorf("counting consecutive failed training runs: %w", err) + } + return count, nil +} + +func (s *SQLiteStore) GetLastTrainingRunEndTime(ctx context.Context) (time.Time, error) { + var raw *string + err := s.db.QueryRowContext(ctx, + `SELECT MAX(COALESCE(completed_at, started_at)) FROM training_runs`, + ).Scan(&raw) + if err != nil { + return time.Time{}, fmt.Errorf("getting last training run end time: %w", err) + } + if raw == nil || *raw == "" { + return time.Time{}, nil + } + formats := []string{ + time.RFC3339Nano, time.RFC3339, + "2006-01-02 15:04:05-07:00", "2006-01-02T15:04:05Z", + "2006-01-02 15:04:05 -0700 MST", + } + for _, f := range formats { + t, parseErr := time.Parse(f, *raw) + if parseErr == nil { + return t, nil + } + } + return time.Time{}, fmt.Errorf("parsing training run time %q", *raw) +} + func (s *SQLiteStore) CountUntrainedExperience(ctx context.Context) (int, error) { var count int err := s.db.QueryRowContext(ctx, diff --git a/internal/store/store.go b/internal/store/store.go index 2fbb6f59..b9598692 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -696,6 +696,10 @@ type ContinuousLearningStore interface { GetLastTrainingRunTime(ctx context.Context) (time.Time, error) CountUntrainedExperience(ctx context.Context) (int, error) MarkExperienceUsedInTraining(ctx context.Context, batchID string, entryIDs []string) error + // CountConsecutiveFailedTrainingRuns counts failed/stale runs since the last success. + CountConsecutiveFailedTrainingRuns(ctx context.Context) (int, error) + // GetLastTrainingRunEndTime returns the end time of the most recent run (any status). + GetLastTrainingRunEndTime(ctx context.Context) (time.Time, error) // Quality drift detection GetEncodingQualityWindow(ctx context.Context, windowSize int) (EncodingQualityWindow, error) diff --git a/internal/store/storetest/mock.go b/internal/store/storetest/mock.go index c96375d1..71e88255 100644 --- a/internal/store/storetest/mock.go +++ b/internal/store/storetest/mock.go @@ -412,6 +412,10 @@ func (MockStore) CountUntrainedExperience(context.Context) (int, error) { return func (MockStore) MarkExperienceUsedInTraining(context.Context, string, []string) error { return nil } +func (MockStore) CountConsecutiveFailedTrainingRuns(context.Context) (int, error) { return 0, nil } +func (MockStore) GetLastTrainingRunEndTime(context.Context) (time.Time, error) { + return time.Time{}, nil +} // --- Lifecycle --- diff --git a/scripts/continuous_train.sh b/scripts/continuous_train.sh index bfda96eb..e1aa0778 100755 --- a/scripts/continuous_train.sh +++ b/scripts/continuous_train.sh @@ -91,6 +91,13 @@ write_failure() { exit 1 } +# Check for e-stop file — training is disabled +ESTOP_FILE="${HOME}/.mnemonic/training.disabled" +if [ -f "$ESTOP_FILE" ]; then + echo "[continuous_train] Training disabled by e-stop file: $ESTOP_FILE" + write_failure "training disabled by e-stop file ($ESTOP_FILE)" +fi + # Stop the daemon to free VRAM echo "[continuous_train] Stopping mnemonic daemon to free VRAM..." systemctl --user stop mnemonic || true @@ -102,6 +109,17 @@ if command -v rocm-smi &>/dev/null; then echo "[continuous_train] VRAM used after daemon stop: ${VRAM_USED:-unknown}" fi +# Gate: abort if VRAM wasn't properly released (daemon still holding memory) +if command -v rocm-smi &>/dev/null; then + VRAM_USED_BYTES=$(rocm-smi --showmeminfo vram 2>/dev/null | grep "Used" | awk '{print $NF}' | head -1) + VRAM_USED_MB=$((VRAM_USED_BYTES / 1048576)) + MAX_USED_MB=1000 + if [ "$VRAM_USED_MB" -gt "$MAX_USED_MB" ] 2>/dev/null; then + write_failure "VRAM still in use after daemon stop: ${VRAM_USED_MB}MB (max: ${MAX_USED_MB}MB). GPU not released." + fi + echo "[continuous_train] VRAM available: $((16384 - VRAM_USED_MB))MB" +fi + # Step 1: Tokenize the batch data echo "[continuous_train] Step 1: Preparing training data..." PREP_SCRIPT="$PROJECT_DIR/training/scripts/prepare_gemma_finetune_data.py" @@ -114,7 +132,7 @@ fi "$VENV_PYTHON" "$PREP_SCRIPT" \ --input "$BATCH_PATH" \ --output-dir "$TOKENIZED_DIR" \ - --max-seq-len 2048 \ + --max-seq-len 1024 \ --eval-ratio 0 \ 2>&1 | tee -a "$LOG" @@ -140,7 +158,7 @@ fi --base-model google/gemma-4-E2B-it \ --train-data "$TOKENIZED_PATH" \ --checkpoint-dir "$CHECKPOINT_DIR" \ - --seq-len 2048 \ + --seq-len 1024 \ --steps 500 \ --batch-size 1 \ --grad-accum 8 \ diff --git a/training/scripts/train_spokes.py b/training/scripts/train_spokes.py index e05a11b4..d7b4b59b 100644 --- a/training/scripts/train_spokes.py +++ b/training/scripts/train_spokes.py @@ -426,7 +426,9 @@ def train(args): torch.cuda.empty_cache() # Training loop + MAX_CONSECUTIVE_OOM = 3 model.train() + consecutive_oom = 0 global_step = start_step opt_step_count = start_step // args.grad_accum losses = [] @@ -466,11 +468,16 @@ def train(args): loss = (loss_sum / n_tokens) / args.grad_accum loss.backward() + consecutive_oom = 0 # successful step resets counter except torch.cuda.OutOfMemoryError: - # Skip long examples that OOM — free memory and continue - print(f" [OOM] Skipped step {global_step} (seq_len={input_ids.shape[1]})") + consecutive_oom += 1 + print(f" [OOM] Skipped step {global_step} (seq_len={input_ids.shape[1]}) [{consecutive_oom}/{MAX_CONSECUTIVE_OOM}]") torch.cuda.empty_cache() global_step += 1 + if consecutive_oom >= MAX_CONSECUTIVE_OOM: + print(f"\n [FATAL] {MAX_CONSECUTIVE_OOM} consecutive OOM errors — VRAM budget is insufficient.") + print(f" Aborting. Reduce --seq-len, --batch-size, or free GPU memory.") + sys.exit(1) continue if (global_step + 1) % args.grad_accum == 0: