Skip to content

feat(engine): event bus with MCP tools, real SSE stream + cogos#10 fix#16

Merged
chazmaniandinkle merged 3 commits intocogos-dev:mainfrom
chazmaniandinkle:feat/event-bus-and-orphan-fix
Apr 21, 2026
Merged

feat(engine): event bus with MCP tools, real SSE stream + cogos#10 fix#16
chazmaniandinkle merged 3 commits intocogos-dev:mainfrom
chazmaniandinkle:feat/event-bus-and-orphan-fix

Conversation

@chazmaniandinkle
Copy link
Copy Markdown
Contributor

Summary

Event bus implementation — closes Agent F's #2 critical MCP blindspot. Also fixes cogos#10 (the `EmitLedgerEvent` orphan-file bug) inline, because per Agent N's design the broker hook must live inside `AppendEvent` itself (`consolidate.go`, `cogblock_ledger.go`, `process.go` all call `AppendEvent` directly, not `emitEvent`).

Draft / stacked on PR #11. This branch is based on `feat/ledger-api` (PR #11 head). The event-bus diff begins with commit `73f1071`; commits before that belong to PR #11. Once #11 merges, I'll rebase onto main, force-push, and retarget this PR's base.

Closes #10.

What landed

  • Two MCP tools: `cog_read_events` (historical, bounded) + `cog_tail_events` (live stream, bounded by `max_events` / `max_duration`). Kept separate from `cog_read_ledger` because audit vs. observability semantics differ.
  • Two HTTP routes: `GET /v1/events` (historical) + `GET /v1/events/stream` (SSE, replacing the keepalive-only stub at `serve_compat.go:174`).
  • Broker hook inside `AppendEvent` so every event source (not just `emitEvent`) flows to subscribers.
  • `EmitLedgerEvent` rewrite: signature preserved (no call-site churn), body refactored to build an `EventEnvelope` and call `AppendEvent(workspaceRoot, sessionID, envelope)` with `sessionID = "mcp-client"` fallback. Callers with a live `*Process` (the MCP `toolEmitEvent`) get accurate session attribution via new `(*Process).EmitEvent`.

HTTP contract preservation (critical)

A parallel substrate depends on these root-package routes: `POST /v1/bus/send`, `GET /v1/bus/{bus_id}/events`, `GET /v1/bus/events`. They remain byte-compatible — this PR touches only `internal/engine/`, not the root package.

Added `bus_api_contract_test.go` (3 regression tests) locking the JSON shape of `busSendResponse`, `busSendRequest`, and the bus-events array element keys. Future PRs that reshape the root bus API will break CI.

`handleBusAck` deleted — route `POST /v1/bus/{bus_id}/ack` dropped acks on the floor (per Agent N §"Real bugs"). Regression test `TestServeBusAckRemoved` confirms the route returns non-2xx.

Design judgments beyond Agent N's spec

  • Additive broker registry instead of single-slot `currentBroker`. The test suite spins up many parallel `*Process` instances; a single slot races. `RegisterBroker` / `UnregisterBroker` (auto-called on `Close`) lets parallel processes coexist. Production still has exactly one; behavior identical.
  • `GET /v1/events` filter wrapper includes `Source`, `Until`, `Order` (default desc), `next_before` pagination cursor.
  • `cog_tail_events` caps: defaults 100 events / 60s; hard caps 1000 / 10m.
  • SSE `last_event_id` resume: reads both the SSE `Last-Event-ID` header and a `last_event_id` query param, preferring the header.

Test plan

  • 14 broker/wiring/helper unit tests — pass
  • 4 `events_query` tests — pass
  • 4 HTTP/SSE/MCP integration tests — pass
  • 3 HTTP-contract regression tests (root package) — pass
  • PR feat(engine): expose hash-chained ledger via MCP tool and HTTP endpoint #11 ledger tests still pass (no regression)
  • `go test ./internal/engine/... -short -count=1` → `ok` in 0.93s
  • `go test ./... -short -count=1` — pass
  • `go vet ./...` + `go build ./...` silent
  • `-race` run over broker/HTTP/wiring — clean

Design reference

Agent N's design CogDoc: `~/cog-workspace/.cog/mem/semantic/surveys/2026-04-21-consolidation/agent-N-event-bus-design.cog.md`

…han fix

Adds an in-process EventBroker that fans out ledger appends to live
subscribers, plus MCP tools and HTTP endpoints on top of it. Replaces the
serve_compat SSE stub with a real broker-backed handler, drops the
handleBusAck stub (no consumer), and collapses the cog_emit_event orphan
write into the hash chain.

Architecture
  - EventBroker: ring buffer + per-sub filtered channel, non-blocking
    publish with drop counter, Last-Event-ID resume semantics via the
    ring. Additive package-level registry so AppendEvent publishes to
    every installed broker (single-slot was racy across parallel
    processes in the test suite).
  - AppendEvent hook: calls brokerSnapshot() at the tail so every ledger
    sink — consolidate.go, cogblock_ledger.go, process.go — feeds the
    live bus without per-site wiring.
  - (*Process).EmitEvent: public helper the MCP tool now uses.
  - QueryEvents: observability-flavored wrapper around QueryLedger with
    a Source filter, duration-shorthand since/until, and desc-default
    order.

Surface
  - MCP: cog_read_events (historical, bounded) + cog_tail_events (live,
    bounded by max_events / max_duration). Kept separate from
    cog_read_ledger — audit vs observability semantics differ.
  - HTTP: GET /v1/events (non-streaming) + GET /v1/events/stream (SSE
    with id/retry/heartbeat). SSE writer extends the per-write deadline
    so the server's 5-min WriteTimeout doesn't kill long-lived
    connections.

cogos#10 fix (orphan EmitLedgerEvent)
  - EmitLedgerEvent now builds an EventEnvelope and routes through
    AppendEvent — per-session directory, hash-chained, visible to the
    broker. Fallback session bucket for callers that only hold *Config
    is "mcp-client".
  - toolEmitEvent in mcp_server.go switched to (*Process).EmitEvent for
    accurate session attribution and source="mcp-client".
  - The flat .cog/ledger/events.jsonl sink is gone.

Deletions
  - serve_compat.handleEventsStream (stub — replaced).
  - serve_compat.handleBusAck (stub, no persistence — deleted per
    Agent N's design; new SSE resume uses Last-Event-ID statelessly).

Tests (all passing)
  - 14 broker unit tests: fan-out, filter (type/session/since), replay,
    backpressure drop, context-cancel unsubscribe, close unblock, race-
    detector-clean concurrent pub/sub, subscriber cap.
  - 2 AppendEvent wiring tests (broker-installed + nil-safe).
  - 4 events_query tests (empty, source filter, order, limit+truncate).
  - 4 HTTP/SSE integration tests: historical, end-to-end SSE, bus-ack
    404, cog#10 round-trip (event in per-session dir, no orphan file).
  - Updated TestEmitIngestEvent to match new envelope shape.
  - HTTP contract regression tests in root package: busSendResponse
    ({ok,seq,hash}), busSendRequest, and bus events JSON shape — pinned
    so a reshape in the root bus API fails CI.

Contract preservation
  - POST /v1/bus/send, GET /v1/bus/{bus_id}/events, GET /v1/bus/events
    live in the root-package daemon (package main) — this PR doesn't
    touch them, so byte-compatible with cog-sandbox-mcp bridge at
    localhost:7823. Regression tests assert the JSON shapes stay put.

Closes cogos-dev#10
chazmaniandinkle added a commit that referenced this pull request Apr 21, 2026
…e via MCP and HTTP

Closes Agent F gap #4 (chat history / turn replay API) by making the
prompt → response pair a first-class, hash-chained ledger citizen.
Also closes cogos#20 (RecordBlock data-loss) inline — RecordBlock stays
untouched (its metadata-only ingest receipt is used by other call sites)
and the new turn.completed event is the authoritative capture point for
the conversation text that was previously dropped on the floor.

Design: Agent R hybrid (recommendation C, survey
cog://mem/semantic/surveys/2026-04-21-consolidation/agent-R-chat-history-design).

What lands:

- internal/engine/turn_storage.go — TurnRecord type, Process.RecordTurn
  method that (1) writes the full turn to a per-session sidecar JSONL
  at .cog/run/turns/<sessionID>.jsonl and (2) appends a truncated-
  preview turn.completed event to the hash-chained ledger. UTF-8-safe
  truncation; defaults: 8 KB prompt preview, 16 KB response preview.
  NextTurnIndex primes from the sidecar on first access so restarts
  don't reset the counter.

- internal/engine/conversation_query.go — QueryConversation wrapper
  that scans the session ledger for turn.completed events and
  optionally hydrates each with the full sidecar row. Slots in cleanly
  where Agent L's QueryLedger will land; until then, does the O(N)
  session-scoped scan locally (cheap for typical sessions).

- serve.go / serve_anthropic.go — handleChat and handleAnthropicMessages
  now prepare a TurnRecord at the top of the handler, thread it into
  completeChat/streamChat/completeAnthropicMessages/streamAnthropicMessages
  (streaming paths accumulate the response via strings.Builder) and
  RecordTurn after the response finishes flushing. GET /v1/conversation
  is a thin wrapper over QueryConversation with query-param filters
  (session_id, after_turn, before_turn, since, limit, include_full,
  include_tools, order).

- mcp_server.go — new cog_read_conversation MCP tool delegates to
  QueryConversation. Completes the cog_read_* family (ledger, cogdoc,
  events, conversation).

- tool_loop.go — new RunToolLoopWithTranscript variant captures a
  []ToolCallRecord transcript (kernel executions + rejections, with
  duration) so handlers can include per-turn kernel-tool detail in the
  sidecar. The original RunToolLoop signature is preserved as a
  1-line delegator to avoid churn in existing call sites / tests.

Integration notes:

- The turn.completed event rides on the standard AppendEvent path, so
  Agent N's broker (PR #16) will fan it out on the live event bus for
  free the moment it lands — cog_tail_events event_type=turn.completed
  becomes live turn replay with no extra code. Agent L's hash chain
  automatically covers conversation history.

- context.assembly events keep emitting as before; the cross-link is
  one-way by timestamp proximity and deferred to a follow-up (kept out
  of scope to keep this PR mechanical).

- RecordBlock and cogblock_ledger.go are intentionally UNTOUCHED.
  cogblock.ingest stays as-is (block_id, kind, message_count) because
  other ingest paths depend on that shape. RecordTurn is the new
  parallel path for the full turn content.

Tests (22 new, all pass under -race):

- TestWriteTurnCompleted{HappyPath,PromptTruncation,ResponseTruncation,
  SidecarAppendDurable,Concurrent} — writer + sidecar + ledger round-trip.
- TestTruncateUTF8Bytes_Boundary — never slices inside a multibyte rune.
- TestNextTurnIndexPrimingFromSidecar — restart scenario.
- TestRecordTurn{StatusDefault,NilGuards,TimestampRespected} — defensive.
- TestQuery{EmptySession,SingleSessionTurnsOrdered,AfterTurnPagination,
  IncludeFullFalse,BadFilterCombo,BadOrder,LimitTruncation,SinceFilter,
  IncludeFullHydratesFromSidecar,ToolCallsIncludedByDefault,
  CountMatchesTruncationFlag,CrossSessionIgnored} — reader semantics.

Full suite (380 tests) passes: go build ./..., go vet ./..., and
go test ./internal/engine/... -short -count=1 -race all clean.

Closes #20
Closes gap #4
@chazmaniandinkle chazmaniandinkle marked this pull request as ready for review April 21, 2026 23:05
@chazmaniandinkle chazmaniandinkle merged commit c481899 into cogos-dev:main Apr 21, 2026
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

bug: EmitLedgerEvent bypasses hash chaining and writes to flat .cog/ledger/events.jsonl

1 participant