Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions cmd/mnemonic/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,33 @@ func serveCommand(configPath string) {
return cfg.MemoryDefaults.SalienceForType(memType)
}

// Create MCP session manager for HTTP transport
mcpResolver := config.NewProjectResolver(cfg.Projects)
mcpSessions := mcp.NewSessionManager(mcp.SessionManagerConfig{
Store: memStore,
Retriever: retriever,
Bus: bus,
Log: log,
Version: Version,
CoachingFile: cfg.Coaching.CoachingFile,
ExcludePatterns: cfg.Perception.Filesystem.ExcludePatterns,
MaxContentBytes: cfg.Perception.Filesystem.MaxContentBytes,
Resolver: mcpResolver,
DaemonURL: fmt.Sprintf("http://%s:%d", cfg.API.Host, cfg.API.Port),
MemDefaults: mcp.MemoryDefaults{
SalienceGeneral: cfg.MemoryDefaults.InitialSalienceGeneral,
SalienceDecision: cfg.MemoryDefaults.InitialSalienceDecision,
SalienceError: cfg.MemoryDefaults.InitialSalienceError,
SalienceInsight: cfg.MemoryDefaults.InitialSalienceInsight,
SalienceLearning: cfg.MemoryDefaults.InitialSalienceLearning,
SalienceHandoff: cfg.MemoryDefaults.InitialSalienceHandoff,
FeedbackStrengthDelta: cfg.MemoryDefaults.FeedbackStrengthDelta,
FeedbackSalienceBoost: cfg.MemoryDefaults.FeedbackSalienceBoost,
},
})
apiDeps.MCPSessions = mcpSessions
defer mcpSessions.Stop(rootCtx)

apiServer := api.NewServer(api.ServerConfig{
Host: cfg.API.Host,
Port: cfg.API.Port,
Expand Down
92 changes: 92 additions & 0 deletions internal/api/routes/mcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package routes

import (
"encoding/json"
"io"
"log/slog"
"net/http"

"github.com/appsprout-dev/mnemonic/internal/mcp"
)

// HandleMCP returns an HTTP handler for the MCP JSON-RPC protocol.
//
// Session lifecycle follows the MCP streamable HTTP transport spec:
// - First request (initialize): no Mcp-Session-Id header needed.
// Server creates a session and returns the ID in the response header.
// - Subsequent requests: client includes Mcp-Session-Id from the
// initialize response. Server routes to the existing session.
// - DELETE with Mcp-Session-Id: explicitly ends the session.
// - Idle sessions are reaped by the session manager after timeout.
func HandleMCP(sm *mcp.SessionManager, log *slog.Logger) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodDelete {
handleMCPDelete(sm, log, w, r)
return
}
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}

// Read and parse the JSON-RPC request
body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) // 1MB limit
if err != nil {
writeJSONRPCError(w, nil, -32700, "Failed to read request body")
return
}

var req mcp.JSONRPCRequest
if err := json.Unmarshal(body, &req); err != nil {
writeJSONRPCError(w, nil, -32700, "Parse error")
return
}

// Resolve session: use client header if present, otherwise create new
clientSessionID := r.Header.Get("Mcp-Session-Id")
srv, sessionKey := sm.GetOrCreate(clientSessionID)

resp := srv.HandleSingleRequest(r.Context(), &req)

// Always return the session ID so the client can include it in subsequent requests
w.Header().Set("Mcp-Session-Id", sessionKey)

// Notifications return nil — respond with 202 Accepted
if resp == nil {
w.WriteHeader(http.StatusAccepted)
return
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(resp); err != nil {
log.Warn("failed to encode MCP HTTP response", "error", err)
}
}
}

// handleMCPDelete explicitly ends an MCP session.
func handleMCPDelete(sm *mcp.SessionManager, log *slog.Logger, w http.ResponseWriter, r *http.Request) {
sessionID := r.Header.Get("Mcp-Session-Id")
if sessionID == "" {
http.Error(w, "Mcp-Session-Id header is required", http.StatusBadRequest)
return
}

sm.EndSession(r.Context(), sessionID)
log.Info("MCP session explicitly ended via DELETE", "session_id", sessionID)
w.WriteHeader(http.StatusNoContent)
}

// writeJSONRPCError writes a JSON-RPC error response.
func writeJSONRPCError(w http.ResponseWriter, id interface{}, code int, message string) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK) // JSON-RPC errors are still 200
_ = json.NewEncoder(w).Encode(map[string]interface{}{
"jsonrpc": "2.0",
"id": id,
"error": map[string]interface{}{
"code": code,
"message": message,
},
})
}
9 changes: 9 additions & 0 deletions internal/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/appsprout-dev/mnemonic/internal/api/routes"
"github.com/appsprout-dev/mnemonic/internal/events"
"github.com/appsprout-dev/mnemonic/internal/llm"
"github.com/appsprout-dev/mnemonic/internal/mcp"
"github.com/appsprout-dev/mnemonic/internal/store"
"github.com/appsprout-dev/mnemonic/internal/web"
)
Expand Down Expand Up @@ -43,6 +44,7 @@ type ServerDeps struct {
ServiceRestarter routes.ServiceRestarter // can be nil if not installed as service
PIDRestart routes.PIDRestartFunc // fallback restart when service manager unavailable
MCPToolCount int // number of registered MCP tools
MCPSessions *mcp.SessionManager // HTTP MCP session manager (nil = disabled)
StartTime time.Time // daemon start time for uptime calculation
Log *slog.Logger
}
Expand Down Expand Up @@ -173,6 +175,13 @@ func (s *Server) registerRoutes() {
s.mux.HandleFunc("PATCH /api/v1/forum/posts/{id}", routes.HandleUpdateForumPost(s.deps.Store, s.deps.Log))
s.mux.HandleFunc("POST /api/v1/forum/posts/{id}/internalize", routes.HandleInternalizeForumPost(s.deps.Store, s.deps.Bus, s.deps.Log))

// MCP over HTTP transport (shares daemon's LLM, store, agents — no subprocess needed)
if s.deps.MCPSessions != nil {
mcpHandler := routes.HandleMCP(s.deps.MCPSessions, s.deps.Log)
s.mux.HandleFunc("POST /mcp", mcpHandler)
s.mux.HandleFunc("DELETE /mcp", mcpHandler)
}

// WebSocket
s.mux.HandleFunc("GET /ws", routes.HandleWebSocket(s.deps.Bus, s.deps.Log))

Expand Down
36 changes: 25 additions & 11 deletions internal/mcp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ import (

// JSON-RPC 2.0 types

type jsonRPCRequest struct {
// JSONRPCRequest is a JSON-RPC 2.0 request.
type JSONRPCRequest struct {
JSONRPC string `json:"jsonrpc"`
ID interface{} `json:"id,omitempty"`
Method string `json:"method"`
Params json.RawMessage `json:"params,omitempty"`
}

type jsonRPCResponse struct {
// JSONRPCResponse is a JSON-RPC 2.0 response.
type JSONRPCResponse struct {
JSONRPC string `json:"jsonrpc"`
ID interface{} `json:"id,omitempty"`
Result interface{} `json:"result,omitempty"`
Expand Down Expand Up @@ -195,7 +197,7 @@ func (srv *MCPServer) Run(ctx context.Context) error {

line := scanner.Bytes()

var req jsonRPCRequest
var req JSONRPCRequest
if err := json.Unmarshal(line, &req); err != nil {
srv.log.Debug("parse error", "error", err)
if err := enc.Encode(errorResponse(nil, -32700, "Parse error")); err != nil {
Expand All @@ -222,8 +224,20 @@ func (srv *MCPServer) Run(ctx context.Context) error {
return scanner.Err()
}

// HandleSingleRequest processes a single JSON-RPC request and returns the response.
// This is the transport-agnostic entry point used by both stdio (Run) and HTTP transports.
func (srv *MCPServer) HandleSingleRequest(ctx context.Context, req *JSONRPCRequest) *JSONRPCResponse {
return srv.handleRequest(ctx, req)
}

// SessionID returns the server's session ID.
func (srv *MCPServer) SessionID() string { return srv.sessionID }

// OnSessionEnd performs cleanup when a session ends. Exported for the session manager.
func (srv *MCPServer) OnSessionEnd(ctx context.Context) { srv.onSessionEnd(ctx) }

// handleRequest dispatches the request to the appropriate handler based on method.
func (srv *MCPServer) handleRequest(ctx context.Context, req *jsonRPCRequest) *jsonRPCResponse {
func (srv *MCPServer) handleRequest(ctx context.Context, req *JSONRPCRequest) *JSONRPCResponse {
switch req.Method {
case "initialize":
return srv.handleInitialize(req)
Expand All @@ -239,7 +253,7 @@ func (srv *MCPServer) handleRequest(ctx context.Context, req *jsonRPCRequest) *j
}

// handleInitialize returns the MCP initialization response.
func (srv *MCPServer) handleInitialize(req *jsonRPCRequest) *jsonRPCResponse {
func (srv *MCPServer) handleInitialize(req *JSONRPCRequest) *JSONRPCResponse {
result := map[string]interface{}{
"protocolVersion": "2024-11-05",
"capabilities": map[string]interface{}{
Expand All @@ -261,7 +275,7 @@ type ToolDefinition struct {
}

// handleToolsList returns the list of available tools.
func (srv *MCPServer) handleToolsList(req *jsonRPCRequest) *jsonRPCResponse {
func (srv *MCPServer) handleToolsList(req *JSONRPCRequest) *JSONRPCResponse {
result := map[string]interface{}{
"tools": allToolDefs(),
}
Expand All @@ -276,7 +290,7 @@ type toolCallParams struct {
}

// handleToolCall dispatches tool calls to their respective handlers.
func (srv *MCPServer) handleToolCall(ctx context.Context, req *jsonRPCRequest) *jsonRPCResponse {
func (srv *MCPServer) handleToolCall(ctx context.Context, req *JSONRPCRequest) *JSONRPCResponse {
var params toolCallParams
if err := json.Unmarshal(req.Params, &params); err != nil {
return errorResponse(req.ID, -32602, "Invalid params")
Expand Down Expand Up @@ -2328,8 +2342,8 @@ func (srv *MCPServer) handleIngestProject(ctx context.Context, args map[string]i
// Helper functions

// errorResponse creates a JSON-RPC error response.
func errorResponse(id interface{}, code int, message string) *jsonRPCResponse {
return &jsonRPCResponse{
func errorResponse(id interface{}, code int, message string) *JSONRPCResponse {
return &JSONRPCResponse{
JSONRPC: "2.0",
ID: id,
Error: &rpcError{
Expand All @@ -2340,8 +2354,8 @@ func errorResponse(id interface{}, code int, message string) *jsonRPCResponse {
}

// successResponse creates a JSON-RPC success response.
func successResponse(id interface{}, result interface{}) *jsonRPCResponse {
return &jsonRPCResponse{
func successResponse(id interface{}, result interface{}) *JSONRPCResponse {
return &JSONRPCResponse{
JSONRPC: "2.0",
ID: id,
Result: result,
Expand Down
6 changes: 3 additions & 3 deletions internal/mcp/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestHandleInitialize(t *testing.T) {
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
srv := NewMCPServer(&mockStore{}, nil, &mockBus{}, logger, "test", "", []string{}, 0, nil, "", DefaultMemoryDefaults())

req := &jsonRPCRequest{
req := &JSONRPCRequest{
JSONRPC: "2.0",
ID: 1,
Method: "initialize",
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestHandleToolsList(t *testing.T) {
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
srv := NewMCPServer(&mockStore{}, nil, &mockBus{}, logger, "test", "", []string{}, 0, nil, "", DefaultMemoryDefaults())

req := &jsonRPCRequest{
req := &JSONRPCRequest{
JSONRPC: "2.0",
ID: 2,
Method: "tools/list",
Expand Down Expand Up @@ -314,7 +314,7 @@ func TestHandleRequestDispatch(t *testing.T) {

for _, tc := range tests {
t.Run(tc.method, func(t *testing.T) {
req := &jsonRPCRequest{
req := &JSONRPCRequest{
JSONRPC: "2.0",
ID: 1,
Method: tc.method,
Expand Down
Loading