diff --git a/cmd/mnemonic/serve.go b/cmd/mnemonic/serve.go index 62335dc5..997172c1 100644 --- a/cmd/mnemonic/serve.go +++ b/cmd/mnemonic/serve.go @@ -685,6 +685,33 @@ func serveCommand(configPath string) { return cfg.MemoryDefaults.SalienceForType(memType) } + // Create MCP session manager for HTTP transport + mcpResolver := config.NewProjectResolver(cfg.Projects) + mcpSessions := mcp.NewSessionManager(mcp.SessionManagerConfig{ + Store: memStore, + Retriever: retriever, + Bus: bus, + Log: log, + Version: Version, + CoachingFile: cfg.Coaching.CoachingFile, + ExcludePatterns: cfg.Perception.Filesystem.ExcludePatterns, + MaxContentBytes: cfg.Perception.Filesystem.MaxContentBytes, + Resolver: mcpResolver, + DaemonURL: fmt.Sprintf("http://%s:%d", cfg.API.Host, cfg.API.Port), + MemDefaults: mcp.MemoryDefaults{ + SalienceGeneral: cfg.MemoryDefaults.InitialSalienceGeneral, + SalienceDecision: cfg.MemoryDefaults.InitialSalienceDecision, + SalienceError: cfg.MemoryDefaults.InitialSalienceError, + SalienceInsight: cfg.MemoryDefaults.InitialSalienceInsight, + SalienceLearning: cfg.MemoryDefaults.InitialSalienceLearning, + SalienceHandoff: cfg.MemoryDefaults.InitialSalienceHandoff, + FeedbackStrengthDelta: cfg.MemoryDefaults.FeedbackStrengthDelta, + FeedbackSalienceBoost: cfg.MemoryDefaults.FeedbackSalienceBoost, + }, + }) + apiDeps.MCPSessions = mcpSessions + defer mcpSessions.Stop(rootCtx) + apiServer := api.NewServer(api.ServerConfig{ Host: cfg.API.Host, Port: cfg.API.Port, diff --git a/internal/api/routes/mcp.go b/internal/api/routes/mcp.go new file mode 100644 index 00000000..29f195ee --- /dev/null +++ b/internal/api/routes/mcp.go @@ -0,0 +1,92 @@ +package routes + +import ( + "encoding/json" + "io" + "log/slog" + "net/http" + + "github.com/appsprout-dev/mnemonic/internal/mcp" +) + +// HandleMCP returns an HTTP handler for the MCP JSON-RPC protocol. +// +// Session lifecycle follows the MCP streamable HTTP transport spec: +// - First request (initialize): no Mcp-Session-Id header needed. +// Server creates a session and returns the ID in the response header. +// - Subsequent requests: client includes Mcp-Session-Id from the +// initialize response. Server routes to the existing session. +// - DELETE with Mcp-Session-Id: explicitly ends the session. +// - Idle sessions are reaped by the session manager after timeout. +func HandleMCP(sm *mcp.SessionManager, log *slog.Logger) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodDelete { + handleMCPDelete(sm, log, w, r) + return + } + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // Read and parse the JSON-RPC request + body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) // 1MB limit + if err != nil { + writeJSONRPCError(w, nil, -32700, "Failed to read request body") + return + } + + var req mcp.JSONRPCRequest + if err := json.Unmarshal(body, &req); err != nil { + writeJSONRPCError(w, nil, -32700, "Parse error") + return + } + + // Resolve session: use client header if present, otherwise create new + clientSessionID := r.Header.Get("Mcp-Session-Id") + srv, sessionKey := sm.GetOrCreate(clientSessionID) + + resp := srv.HandleSingleRequest(r.Context(), &req) + + // Always return the session ID so the client can include it in subsequent requests + w.Header().Set("Mcp-Session-Id", sessionKey) + + // Notifications return nil — respond with 202 Accepted + if resp == nil { + w.WriteHeader(http.StatusAccepted) + return + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(resp); err != nil { + log.Warn("failed to encode MCP HTTP response", "error", err) + } + } +} + +// handleMCPDelete explicitly ends an MCP session. +func handleMCPDelete(sm *mcp.SessionManager, log *slog.Logger, w http.ResponseWriter, r *http.Request) { + sessionID := r.Header.Get("Mcp-Session-Id") + if sessionID == "" { + http.Error(w, "Mcp-Session-Id header is required", http.StatusBadRequest) + return + } + + sm.EndSession(r.Context(), sessionID) + log.Info("MCP session explicitly ended via DELETE", "session_id", sessionID) + w.WriteHeader(http.StatusNoContent) +} + +// writeJSONRPCError writes a JSON-RPC error response. +func writeJSONRPCError(w http.ResponseWriter, id interface{}, code int, message string) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) // JSON-RPC errors are still 200 + _ = json.NewEncoder(w).Encode(map[string]interface{}{ + "jsonrpc": "2.0", + "id": id, + "error": map[string]interface{}{ + "code": code, + "message": message, + }, + }) +} diff --git a/internal/api/server.go b/internal/api/server.go index 8fab0a21..6686ccc8 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -13,6 +13,7 @@ import ( "github.com/appsprout-dev/mnemonic/internal/api/routes" "github.com/appsprout-dev/mnemonic/internal/events" "github.com/appsprout-dev/mnemonic/internal/llm" + "github.com/appsprout-dev/mnemonic/internal/mcp" "github.com/appsprout-dev/mnemonic/internal/store" "github.com/appsprout-dev/mnemonic/internal/web" ) @@ -43,6 +44,7 @@ type ServerDeps struct { ServiceRestarter routes.ServiceRestarter // can be nil if not installed as service PIDRestart routes.PIDRestartFunc // fallback restart when service manager unavailable MCPToolCount int // number of registered MCP tools + MCPSessions *mcp.SessionManager // HTTP MCP session manager (nil = disabled) StartTime time.Time // daemon start time for uptime calculation Log *slog.Logger } @@ -173,6 +175,13 @@ func (s *Server) registerRoutes() { s.mux.HandleFunc("PATCH /api/v1/forum/posts/{id}", routes.HandleUpdateForumPost(s.deps.Store, s.deps.Log)) s.mux.HandleFunc("POST /api/v1/forum/posts/{id}/internalize", routes.HandleInternalizeForumPost(s.deps.Store, s.deps.Bus, s.deps.Log)) + // MCP over HTTP transport (shares daemon's LLM, store, agents — no subprocess needed) + if s.deps.MCPSessions != nil { + mcpHandler := routes.HandleMCP(s.deps.MCPSessions, s.deps.Log) + s.mux.HandleFunc("POST /mcp", mcpHandler) + s.mux.HandleFunc("DELETE /mcp", mcpHandler) + } + // WebSocket s.mux.HandleFunc("GET /ws", routes.HandleWebSocket(s.deps.Bus, s.deps.Log)) diff --git a/internal/mcp/server.go b/internal/mcp/server.go index 5372b66d..87014667 100644 --- a/internal/mcp/server.go +++ b/internal/mcp/server.go @@ -24,14 +24,16 @@ import ( // JSON-RPC 2.0 types -type jsonRPCRequest struct { +// JSONRPCRequest is a JSON-RPC 2.0 request. +type JSONRPCRequest struct { JSONRPC string `json:"jsonrpc"` ID interface{} `json:"id,omitempty"` Method string `json:"method"` Params json.RawMessage `json:"params,omitempty"` } -type jsonRPCResponse struct { +// JSONRPCResponse is a JSON-RPC 2.0 response. +type JSONRPCResponse struct { JSONRPC string `json:"jsonrpc"` ID interface{} `json:"id,omitempty"` Result interface{} `json:"result,omitempty"` @@ -195,7 +197,7 @@ func (srv *MCPServer) Run(ctx context.Context) error { line := scanner.Bytes() - var req jsonRPCRequest + var req JSONRPCRequest if err := json.Unmarshal(line, &req); err != nil { srv.log.Debug("parse error", "error", err) if err := enc.Encode(errorResponse(nil, -32700, "Parse error")); err != nil { @@ -222,8 +224,20 @@ func (srv *MCPServer) Run(ctx context.Context) error { return scanner.Err() } +// HandleSingleRequest processes a single JSON-RPC request and returns the response. +// This is the transport-agnostic entry point used by both stdio (Run) and HTTP transports. +func (srv *MCPServer) HandleSingleRequest(ctx context.Context, req *JSONRPCRequest) *JSONRPCResponse { + return srv.handleRequest(ctx, req) +} + +// SessionID returns the server's session ID. +func (srv *MCPServer) SessionID() string { return srv.sessionID } + +// OnSessionEnd performs cleanup when a session ends. Exported for the session manager. +func (srv *MCPServer) OnSessionEnd(ctx context.Context) { srv.onSessionEnd(ctx) } + // handleRequest dispatches the request to the appropriate handler based on method. -func (srv *MCPServer) handleRequest(ctx context.Context, req *jsonRPCRequest) *jsonRPCResponse { +func (srv *MCPServer) handleRequest(ctx context.Context, req *JSONRPCRequest) *JSONRPCResponse { switch req.Method { case "initialize": return srv.handleInitialize(req) @@ -239,7 +253,7 @@ func (srv *MCPServer) handleRequest(ctx context.Context, req *jsonRPCRequest) *j } // handleInitialize returns the MCP initialization response. -func (srv *MCPServer) handleInitialize(req *jsonRPCRequest) *jsonRPCResponse { +func (srv *MCPServer) handleInitialize(req *JSONRPCRequest) *JSONRPCResponse { result := map[string]interface{}{ "protocolVersion": "2024-11-05", "capabilities": map[string]interface{}{ @@ -261,7 +275,7 @@ type ToolDefinition struct { } // handleToolsList returns the list of available tools. -func (srv *MCPServer) handleToolsList(req *jsonRPCRequest) *jsonRPCResponse { +func (srv *MCPServer) handleToolsList(req *JSONRPCRequest) *JSONRPCResponse { result := map[string]interface{}{ "tools": allToolDefs(), } @@ -276,7 +290,7 @@ type toolCallParams struct { } // handleToolCall dispatches tool calls to their respective handlers. -func (srv *MCPServer) handleToolCall(ctx context.Context, req *jsonRPCRequest) *jsonRPCResponse { +func (srv *MCPServer) handleToolCall(ctx context.Context, req *JSONRPCRequest) *JSONRPCResponse { var params toolCallParams if err := json.Unmarshal(req.Params, ¶ms); err != nil { return errorResponse(req.ID, -32602, "Invalid params") @@ -2328,8 +2342,8 @@ func (srv *MCPServer) handleIngestProject(ctx context.Context, args map[string]i // Helper functions // errorResponse creates a JSON-RPC error response. -func errorResponse(id interface{}, code int, message string) *jsonRPCResponse { - return &jsonRPCResponse{ +func errorResponse(id interface{}, code int, message string) *JSONRPCResponse { + return &JSONRPCResponse{ JSONRPC: "2.0", ID: id, Error: &rpcError{ @@ -2340,8 +2354,8 @@ func errorResponse(id interface{}, code int, message string) *jsonRPCResponse { } // successResponse creates a JSON-RPC success response. -func successResponse(id interface{}, result interface{}) *jsonRPCResponse { - return &jsonRPCResponse{ +func successResponse(id interface{}, result interface{}) *JSONRPCResponse { + return &JSONRPCResponse{ JSONRPC: "2.0", ID: id, Result: result, diff --git a/internal/mcp/server_test.go b/internal/mcp/server_test.go index 060dc76c..2c48be2d 100644 --- a/internal/mcp/server_test.go +++ b/internal/mcp/server_test.go @@ -32,7 +32,7 @@ func TestHandleInitialize(t *testing.T) { logger := slog.New(slog.NewTextHandler(io.Discard, nil)) srv := NewMCPServer(&mockStore{}, nil, &mockBus{}, logger, "test", "", []string{}, 0, nil, "", DefaultMemoryDefaults()) - req := &jsonRPCRequest{ + req := &JSONRPCRequest{ JSONRPC: "2.0", ID: 1, Method: "initialize", @@ -91,7 +91,7 @@ func TestHandleToolsList(t *testing.T) { logger := slog.New(slog.NewTextHandler(io.Discard, nil)) srv := NewMCPServer(&mockStore{}, nil, &mockBus{}, logger, "test", "", []string{}, 0, nil, "", DefaultMemoryDefaults()) - req := &jsonRPCRequest{ + req := &JSONRPCRequest{ JSONRPC: "2.0", ID: 2, Method: "tools/list", @@ -314,7 +314,7 @@ func TestHandleRequestDispatch(t *testing.T) { for _, tc := range tests { t.Run(tc.method, func(t *testing.T) { - req := &jsonRPCRequest{ + req := &JSONRPCRequest{ JSONRPC: "2.0", ID: 1, Method: tc.method, diff --git a/internal/mcp/session.go b/internal/mcp/session.go new file mode 100644 index 00000000..5ffc338b --- /dev/null +++ b/internal/mcp/session.go @@ -0,0 +1,204 @@ +package mcp + +import ( + "context" + "log/slog" + "sync" + "time" + + "github.com/appsprout-dev/mnemonic/internal/agent/retrieval" + "github.com/appsprout-dev/mnemonic/internal/config" + "github.com/appsprout-dev/mnemonic/internal/events" + "github.com/appsprout-dev/mnemonic/internal/store" +) + +// SessionManager manages MCPServer instances for HTTP transport sessions. +// Each unique session ID gets its own MCPServer with isolated per-session state +// (session memories, recall cache, context suggestions). All sessions share +// the daemon's store, LLM, retrieval agent, and event bus. +type SessionManager struct { + mu sync.Mutex + sessions map[string]*httpSession + + // Shared dependencies (from daemon) + store store.Store + retriever *retrieval.RetrievalAgent + bus events.Bus + log *slog.Logger + version string + coachingFile string + excludePatterns []string + maxContentBytes int + resolver ProjectResolver + daemonURL string + memDefaults MemoryDefaults + + idleTimeout time.Duration // how long before an idle session is expired + stopCh chan struct{} // signals the reaper goroutine to stop +} + +type httpSession struct { + server *MCPServer + lastActive time.Time +} + +// SessionManagerConfig holds configuration for the session manager. +type SessionManagerConfig struct { + Store store.Store + Retriever *retrieval.RetrievalAgent + Bus events.Bus + Log *slog.Logger + Version string + CoachingFile string + ExcludePatterns []string + MaxContentBytes int + Resolver *config.ProjectResolver + DaemonURL string + MemDefaults MemoryDefaults + IdleTimeout time.Duration // default: 30 minutes +} + +// NewSessionManager creates a session manager for HTTP MCP transport. +func NewSessionManager(cfg SessionManagerConfig) *SessionManager { + timeout := cfg.IdleTimeout + if timeout == 0 { + timeout = 30 * time.Minute + } + + sm := &SessionManager{ + sessions: make(map[string]*httpSession), + store: cfg.Store, + retriever: cfg.Retriever, + bus: cfg.Bus, + log: cfg.Log, + version: cfg.Version, + coachingFile: cfg.CoachingFile, + excludePatterns: cfg.ExcludePatterns, + maxContentBytes: cfg.MaxContentBytes, + resolver: cfg.Resolver, + daemonURL: cfg.DaemonURL, + memDefaults: cfg.MemDefaults, + idleTimeout: timeout, + stopCh: make(chan struct{}), + } + + // Start background reaper for idle sessions + go sm.reapLoop() + + return sm +} + +// GetOrCreate returns the MCPServer for a session and its session key. +// If clientSessionID is empty (first request), a new session is created. +// If clientSessionID matches an existing session, that session is returned. +// The returned sessionKey should be sent back to the client in the Mcp-Session-Id header. +func (sm *SessionManager) GetOrCreate(clientSessionID string) (*MCPServer, string) { + sm.mu.Lock() + defer sm.mu.Unlock() + + if clientSessionID != "" { + if s, ok := sm.sessions[clientSessionID]; ok { + s.lastActive = time.Now() + return s.server, clientSessionID + } + } + + // Create new MCPServer for this session + srv := NewMCPServer( + sm.store, sm.retriever, sm.bus, sm.log, + sm.version, sm.coachingFile, sm.excludePatterns, + sm.maxContentBytes, sm.resolver, sm.daemonURL, + sm.memDefaults, + ) + + // Use the MCPServer's generated session ID as the key + key := srv.SessionID() + sm.sessions[key] = &httpSession{ + server: srv, + lastActive: time.Now(), + } + + sm.log.Info("HTTP MCP session created", "session_id", key) + return srv, key +} + +// EndSession explicitly ends a session and cleans up. +func (sm *SessionManager) EndSession(ctx context.Context, sessionID string) { + sm.mu.Lock() + s, ok := sm.sessions[sessionID] + if ok { + delete(sm.sessions, sessionID) + } + sm.mu.Unlock() + + if ok { + s.server.OnSessionEnd(ctx) + sm.log.Info("HTTP MCP session ended", "client_session", sessionID) + } +} + +// ActiveSessions returns the number of active sessions. +func (sm *SessionManager) ActiveSessions() int { + sm.mu.Lock() + defer sm.mu.Unlock() + return len(sm.sessions) +} + +// Stop shuts down the session manager, ending all active sessions. +func (sm *SessionManager) Stop(ctx context.Context) { + close(sm.stopCh) + + sm.mu.Lock() + sessions := make(map[string]*httpSession, len(sm.sessions)) + for k, v := range sm.sessions { + sessions[k] = v + } + sm.sessions = make(map[string]*httpSession) + sm.mu.Unlock() + + for _, s := range sessions { + s.server.OnSessionEnd(ctx) + } + sm.log.Info("session manager stopped", "sessions_ended", len(sessions)) +} + +// reapLoop periodically checks for and expires idle sessions. +func (sm *SessionManager) reapLoop() { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-sm.stopCh: + return + case <-ticker.C: + sm.reapIdle() + } + } +} + +func (sm *SessionManager) reapIdle() { + sm.mu.Lock() + var expired []string + now := time.Now() + for id, s := range sm.sessions { + if now.Sub(s.lastActive) > sm.idleTimeout { + expired = append(expired, id) + } + } + // Remove from map while holding lock + expiredSessions := make([]*httpSession, 0, len(expired)) + for _, id := range expired { + expiredSessions = append(expiredSessions, sm.sessions[id]) + delete(sm.sessions, id) + } + sm.mu.Unlock() + + // Clean up outside the lock + for _, s := range expiredSessions { + s.server.OnSessionEnd(context.Background()) + } + if len(expired) > 0 { + sm.log.Info("reaped idle HTTP MCP sessions", "count", len(expired)) + } +} diff --git a/training/docs/experiment_registry.md b/training/docs/experiment_registry.md index 3788889b..f1b15201 100644 --- a/training/docs/experiment_registry.md +++ b/training/docs/experiment_registry.md @@ -1014,6 +1014,27 @@ Rotation parameter overhead per layer (rank=64): - **Hardware:** Local RX 7800 XT, 16GB VRAM, ROCm 7.2.1. Daemon stopped for training. VRAM budget: ~8 GB base (bf16) + ~132 MB spokes (fp32) + ~264 MB optimizer + activations (gradient checkpointing). Expected to fit within 16 GB. - **Metrics:** Primary: 7-metric faithfulness eval (EPR, FR, TED, CCS, MIH, NP, SC). Secondary: eval loss/PPL, stress_test_hallucination.py (7/7 target), novel schema compliance. Tertiary: inference throughput (tok/s) at RQ4 via llama.cpp. - **Inference plan:** Export via export_qwen35_spokes.py (now parameterized for any Qwen 3.5 size), quantize to RQ4 via rotorq_quantize_gguf.py, benchmark throughput on RX 7800 XT. Expected: ~2.25 GB weights (RQ4), ~60-70 tok/s. -- **Open question:** Should spokes be placed on all 32 layers, or only the 8 full-attention layers? DeltaNet layers use linear attention with recurrent state — spoke adaptation may not be needed there. Could test attention-only spoke placement as a follow-up (EXP-28). +- **Open question:** Should spokes be placed on all 32 layers, or only the 8 full-attention layers? DeltaNet layers use linear attention with recurrent state — spoke adaptation may not be needed there. - **Result:** (pending — blocked on EXP-26 completion) - **Verdict:** (pending) + +### EXP-28: Project Bespoke — Structured Pruning of Gemma 4 31B to Mnemonic's Own Model + +- **Date:** 2026-04-09 +- **Status:** REGISTERED +- **Hypothesis:** Gemma 4 31B (30.7B, 60-layer dense transformer) contains a structured subnetwork of ~1.5-2B parameters that, when extracted via targeted structured pruning and continued pretraining on mnemonic's encoding data, will match or exceed the current Qwen 3.5 2B + spokes system on all faithfulness metrics while running 3-5x faster at inference. +- **Variable:** Model identity. Current system: frozen pretrained Qwen 2B + 25M trainable spoke adapters (someone else's model with our paint). Target: a standalone 1.5-2B model extracted from Gemma 4 31B, purpose-built for mnemonic's tasks (our model). +- **Control:** EXP-26 (Qwen 3.5 2B + spokes, v7 data, 7-metric faithfulness eval). +- **Prediction:** The pruned model matches EXP-26 on all 7 faithfulness metrics (EPR >90%, FR <5%, TED 0%, SC 100%, CCS <0.7, MIH 3/3, NP >95%) and stress test 7/7. Inference speed >200 tok/s on RX 7800 XT (current: 95 tok/s). VRAM <1.5GB (current: ~3GB). If the pruned 2B doesn't beat the full Qwen 2B + spokes on encoding quality, the 31B's extra capacity didn't provide better "lottery tickets" for this task. +- **Method:** Sheared LLaMA (Xia et al., ICLR 2024) adapted for Gemma 4 architecture. Targeted structural pruning with learned masks — jointly prunes layers, attention heads, hidden dimensions, and FFN intermediate dimensions. Followed by continued pretraining on mnemonic encoding data with dynamic batch loading. Progressive targets: 8B → 4B → 2B → 1.5B to find the quality cliff. +- **Config (Phase 1 — full fine-tune baseline):** Gemma 4 31B (all params unfrozen, bf16), full mnemonic task data (v7 + encoding captures), LR TBD (sweep needed), gradient checkpointing. MI300X droplet (192GB HBM3e). Collect per-layer importance metrics. +- **Config (Phase 2 — pruning):** Learned pruning masks on encoding task loss. Target shapes: 20 layers / hidden 2048 / 16 heads / FFN 5504 for ~2B target. 3K-5K mask-learning steps, then 5-10B tokens continued pretraining. MI300X. +- **Config (Phase 3 — local deployment):** Export pruned model as standalone GGUF. Benchmark on RX 7800 XT via llama.cpp. No spoke adapters needed — encoding behavior baked into the model. Optional: add spokes for multi-task (synthesis, retrieval). +- **Data:** V7 encoding dataset (5,292 train / 588 eval) for fine-tuning and pruning. May need additional pretraining tokens (diverse text) for continued pretraining phase. +- **Hardware:** MI300X (192GB) for Phases 1-2. RX 7800 XT (16GB) for Phase 3 and all evaluation. Estimated MI300X cost: $80-160. +- **References:** Sheared LLaMA (arxiv:2310.06694), Lottery Ticket Hypothesis (arxiv:1803.03635), SliceGPT (arxiv:2401.15024), LLM-Pruner (arxiv:2305.11627). Felix-LM design paper. +- **Tracking:** GitHub issue #386 (Project Bespoke epic) +- **Metrics:** Primary: 7-metric faithfulness eval + stress test. Secondary: inference tok/s, VRAM, encoding latency. Tertiary: per-pruning-target quality curves (quality vs model size). +- **Go/no-go gate:** After Phase 2 pruning to 2B: if quality < EXP-26 on >2 faithfulness metrics, STOP. The 31B doesn't provide better subnetworks for this task than the native 2B. +- **Result:** (pending) +- **Verdict:** (pending)