Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions internal/agent/consolidation/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,12 +375,17 @@ func (ca *ConsolidationAgent) runCycle(ctx context.Context) (*CycleReport, error
// Publish consolidation completed event
if ca.bus != nil {
_ = ca.bus.Publish(ctx, events.ConsolidationCompleted{
DurationMs: report.Duration.Milliseconds(),
MemoriesProcessed: report.MemoriesProcessed,
MemoriesDecayed: report.MemoriesDecayed,
MergedClusters: report.MergesPerformed,
AssociationsPruned: report.AssociationsPruned,
Ts: time.Now(),
DurationMs: report.Duration.Milliseconds(),
MemoriesProcessed: report.MemoriesProcessed,
MemoriesDecayed: report.MemoriesDecayed,
MergedClusters: report.MergesPerformed,
AssociationsPruned: report.AssociationsPruned,
TransitionedFading: report.TransitionedFading,
TransitionedArchived: report.TransitionedArchived,
PatternsExtracted: report.PatternsExtracted,
PatternsDecayed: report.PatternsDecayed,
NeverRecalledArchived: report.NeverRecalledArchived,
Ts: time.Now(),
})
}

Expand Down
102 changes: 102 additions & 0 deletions internal/api/routes/backfill.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package routes

import (
"context"
"log/slog"
"net/http"
"time"

"github.com/appsprout-dev/mnemonic/internal/llm"
"github.com/appsprout-dev/mnemonic/internal/store"
)

// BackfillResponse reports what the backfill operation did.
type BackfillResponse struct {
Total int `json:"total"`
Embedded int `json:"embedded"`
Failed int `json:"failed"`
Skipped int `json:"skipped"`
Errors []string `json:"errors,omitempty"`
}

// HandleBackfillEmbeddings finds memories with empty embeddings and generates them.
func HandleBackfillEmbeddings(s store.Store, provider llm.Provider, log *slog.Logger) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Minute)
defer cancel()

// Find all active memories missing embeddings
memories, err := s.ListMemories(ctx, "", 500, 0)
if err != nil {
log.Error("backfill: failed to list memories", "error", err)
writeError(w, http.StatusInternalServerError, "failed to list memories", "STORE_ERROR")
return
}

var missing []store.Memory
for _, m := range memories {
if len(m.Embedding) == 0 {
missing = append(missing, m)
}
}

if len(missing) == 0 {
writeJSON(w, http.StatusOK, BackfillResponse{Total: 0})
return
}

log.Info("backfill: starting embedding backfill", "missing", len(missing))

// Quick sanity check: can we embed at all?
testEmb, testErr := provider.Embed(ctx, "test embedding sanity check")
if testErr != nil {
log.Error("backfill: embedding sanity check failed", "error", testErr)
writeJSON(w, http.StatusOK, BackfillResponse{Total: len(missing), Errors: []string{"sanity check failed: " + testErr.Error()}})
return
}
log.Info("backfill: sanity check passed", "dims", len(testEmb))

resp := BackfillResponse{Total: len(missing)}

for _, mem := range missing {
select {
case <-ctx.Done():
log.Warn("backfill: context cancelled", "embedded", resp.Embedded, "remaining", resp.Total-resp.Embedded-resp.Failed)
writeJSON(w, http.StatusOK, resp)
return
default:
}

// Build embedding text from summary + content (same as encoding agent)
text := mem.Summary + " " + mem.Content
if len(text) > 4000 {
text = text[:4000]
}

embedding, err := provider.Embed(ctx, text)
if err != nil {
resp.Errors = append(resp.Errors, "embed:"+mem.ID[:8]+":"+err.Error())
resp.Failed++
continue
}

if len(embedding) == 0 {
resp.Skipped++
continue
}

// Use targeted update to avoid FK issues with raw_id
if err := s.UpdateEmbedding(ctx, mem.ID, embedding); err != nil {
resp.Errors = append(resp.Errors, "update:"+mem.ID[:8]+":"+err.Error())
resp.Failed++
continue
}

resp.Embedded++
log.Debug("backfill: embedded memory", "id", mem.ID, "dims", len(embedding))
}

log.Info("backfill: completed", "total", resp.Total, "embedded", resp.Embedded, "failed", resp.Failed)
writeJSON(w, http.StatusOK, resp)
}
}
24 changes: 24 additions & 0 deletions internal/api/routes/retrieval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package routes

import (
"log/slog"
"net/http"

"github.com/appsprout-dev/mnemonic/internal/agent/retrieval"
)

// HandleRetrievalStats returns the retrieval agent's in-memory performance stats.
func HandleRetrievalStats(retriever *retrieval.RetrievalAgent, log *slog.Logger) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if retriever == nil {
writeJSON(w, http.StatusOK, map[string]any{
"total_queries": 0,
"total_memories_retrieved": 0,
"avg_memories_per_query": 0,
"avg_synthesis_ms": 0,
})
return
}
writeJSON(w, http.StatusOK, retriever.GetStats())
}
}
14 changes: 14 additions & 0 deletions internal/api/routes/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ func HandleWebSocket(bus events.Bus, log *slog.Logger) http.HandlerFunc {
events.TypeSystemHealth,
events.TypeWatcherEvent,
events.TypeEpisodeClosed,
events.TypePatternDiscovered,
events.TypeAbstractionCreated,
events.TypeMemoryAmended,
events.TypeSessionEnded,
}

for _, eventType := range eventTypes {
Expand Down Expand Up @@ -208,6 +212,16 @@ func wsConnEventToMessage(evt events.Event) WebSocketMessage {
payload = e
case events.WatcherEvent:
payload = e
case events.EpisodeClosed:
payload = e
case events.PatternDiscovered:
payload = e
case events.AbstractionCreated:
payload = e
case events.MemoryAmended:
payload = e
case events.SessionEnded:
payload = e
default:
// Fallback for unknown event types
payload = map[string]interface{}{}
Expand Down
6 changes: 6 additions & 0 deletions internal/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ func (s *Server) registerRoutes() {
// Activity (watcher-derived concept tracker for MCP sync)
s.mux.HandleFunc("GET /api/v1/activity", routes.HandleActivity(s.deps.Retriever, s.deps.Log))

// Retrieval stats
s.mux.HandleFunc("GET /api/v1/retrieval/stats", routes.HandleRetrievalStats(s.deps.Retriever, s.deps.Log))

// Embedding backfill
s.mux.HandleFunc("POST /api/v1/embeddings/backfill", routes.HandleBackfillEmbeddings(s.deps.Store, s.deps.LLM, s.deps.Log))

// Feedback
s.mux.HandleFunc("POST /api/v1/feedback", routes.HandleFeedback(s.deps.Store, s.deps.Log))

Expand Down
17 changes: 11 additions & 6 deletions internal/events/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,17 @@ func (e ConsolidationStarted) EventTimestamp() time.Time { return e.Ts }

// ConsolidationCompleted is emitted when a consolidation cycle finishes.
type ConsolidationCompleted struct {
DurationMs int64 `json:"duration_ms"`
MemoriesProcessed int `json:"memories_processed"`
MemoriesDecayed int `json:"memories_decayed"`
MergedClusters int `json:"merged_clusters"`
AssociationsPruned int `json:"associations_pruned"`
Ts time.Time `json:"timestamp"`
DurationMs int64 `json:"duration_ms"`
MemoriesProcessed int `json:"memories_processed"`
MemoriesDecayed int `json:"memories_decayed"`
MergedClusters int `json:"merged_clusters"`
AssociationsPruned int `json:"associations_pruned"`
TransitionedFading int `json:"transitioned_fading"`
TransitionedArchived int `json:"transitioned_archived"`
PatternsExtracted int `json:"patterns_extracted"`
PatternsDecayed int `json:"patterns_decayed"`
NeverRecalledArchived int `json:"never_recalled_archived"`
Ts time.Time `json:"timestamp"`
}

func (e ConsolidationCompleted) EventType() string { return TypeConsolidationCompleted }
Expand Down
22 changes: 22 additions & 0 deletions internal/store/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,28 @@ func (s *SQLiteStore) UpdateMemory(ctx context.Context, mem store.Memory) error
}

// UpdateSalience updates the salience of a memory.
func (s *SQLiteStore) UpdateEmbedding(ctx context.Context, id string, embedding []float32) error {
var embeddingBlob []byte
if len(embedding) > 0 {
embeddingBlob = encodeEmbedding(embedding)
}

query := `UPDATE memories SET embedding = ?, updated_at = ? WHERE id = ?`
result, err := s.db.ExecContext(ctx, query, embeddingBlob, time.Now().Format(time.RFC3339), id)
if err != nil {
return fmt.Errorf("failed to update embedding: %w", err)
}

rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
if rowsAffected == 0 {
return fmt.Errorf("memory with id %s: %w", id, store.ErrNotFound)
}
return nil
}

func (s *SQLiteStore) UpdateSalience(ctx context.Context, id string, salience float32) error {
query := `UPDATE memories SET salience = ?, updated_at = ? WHERE id = ?`

Expand Down
1 change: 1 addition & 0 deletions internal/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ type Store interface {
GetMemoryByRawID(ctx context.Context, rawID string) (Memory, error)
UpdateMemory(ctx context.Context, mem Memory) error
UpdateSalience(ctx context.Context, id string, salience float32) error
UpdateEmbedding(ctx context.Context, id string, embedding []float32) error
UpdateState(ctx context.Context, id string, state string) error
IncrementAccess(ctx context.Context, id string) error
ListMemories(ctx context.Context, state string, limit, offset int) ([]Memory, error)
Expand Down
3 changes: 2 additions & 1 deletion internal/store/storetest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ func (MockStore) GetMemoryByRawID(context.Context, string) (store.Memory, error)
return store.Memory{}, nil
}
func (MockStore) UpdateMemory(context.Context, store.Memory) error { return nil }
func (MockStore) UpdateSalience(context.Context, string, float32) error { return nil }
func (MockStore) UpdateSalience(context.Context, string, float32) error { return nil }
func (MockStore) UpdateEmbedding(context.Context, string, []float32) error { return nil }
func (MockStore) UpdateState(context.Context, string, string) error { return nil }
func (MockStore) IncrementAccess(context.Context, string) error { return nil }
func (MockStore) ListMemories(context.Context, string, int, int) ([]store.Memory, error) {
Expand Down
Loading