feat(serverevents): Phase 5 — daemon bridge for real OnProjectRemoved e2e#171
Conversation
…ents bridge Documents the two-path delivery model (real-time stream + 60s heartbeat), dedup LRU policy, auth injection approach, and design decision rationale. References spec.md FR-9/FR-10/NFR-3/NFR-4, plan.md Phase 5, and proto-extensions.md per task AC.
256-entry LRU keyed on (event_type, project_id) using stdlib container/list (doubly-linked) + map. Mark(eventType, projectID) returns true if already seen (duplicate); false if new. Thread-safe via sync.Mutex. No external dependencies — the dedup is 68 LOC. Shared by runEventStream and runSyncTicker to suppress duplicate fan-outs across both delivery paths.
TestLRU_NewKey: new key returns false TestLRU_DuplicateKey: duplicate returns true TestLRU_Eviction_AtCapacity: capacity-4 LRU evicts on insert #5 TestLRU_LeastRecentEvicted: refreshed entry survives, unreferenced entry evicted TestLRU_ConcurrentMark: 100 goroutines racing on unique + shared keys
EventsClient interface wraps pb.EngramServiceClient with only the two methods the bridge needs (ProjectEvents streaming RPC + SyncProjectState unary RPC). grpcEventsClient is the production implementation. Enables bridge tests to use an in-process bufconn fake without a real network.
…c ticker
Bridge struct with:
- daemon-instance ID: "${pid}-${startUnix}" (built once at NewBridge)
- Start(ctx): launches runEventStream + runSyncTicker under derived ctx
- Stop(): cancels derived ctx + WaitGroup drain
runEventStream (T042):
- Opens persistent ProjectEvents gRPC stream
- Exponential backoff on error: 1s → 2s → 4s → 60s cap (NFR-3)
- Deduplicates via LRU before fan-out
- ctx cancellation exits cleanly
runSyncTicker (T043):
- 60s ± 5s jitter ticker (NFR-4)
- Calls SyncProjectState with locally tracked project IDs
- Fans out OnProjectRemoved for each removed[] entry; dedup-checked
- Logs unknown[] at WARN
Tracked-project source: Option C (own sync.Map seeded by stream events).
Option A (interface change) rejected — frozen interface.
Option B (dispatcher hook) rejected — no OnProjectConnect subscription
surface exists on the dispatcher without making bridge a full EngramModule.
Option C is safe: first stream event seeds the tracker; heartbeat catches
the rest. Fan-out of a missed first event is a safe no-op.
gRPC connection: fresh dial (not shared with engramcore pool). Reason:
engramcore uses UnaryClientInterceptor for auth; streaming RPCs require
auth in call context metadata. Sharing the pool would require adding a
StreamInterceptor to a shared connection, changing its behaviour. The
bridge owns its connection and injects the token via outgoingContext().
All tests use t.Parallel() and an in-process bufconn-backed fake EngramService: TestBridge_HappyPath_StreamEvent: PROJECT_EVENT_TYPE_REMOVED fires OnProjectRemoved within 1s TestBridge_Reconnect: server drop causes backoff; bridge reconnects and delivers after drop cleared TestBridge_DedupAcrossStreamAndHeartbeat: same project from stream then heartbeat fires exactly once TestBridge_HeartbeatCatchesMissed: syncProjectState direct call delivers missed removal without stream TestBridge_StopExitsCleanly: Stop() returns within 5s NFR-9 budget
- Import internal/handlers/serverevents - Construct NewBridge(logger, reg, nil) after eng.New succeeds - Call sevBridge.Start(daemonCtx) before eng.Run (bridge runs for daemon lifetime) - Call sevBridge.Stop() before pipeline.ShutdownAll on both shutdown paths (normal SIGTERM and engine.Run error exit) so in-flight fan-outs complete before modules tear down If ENGRAM_SERVER_URL is not set, Start() is a no-op with a WARN log.
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 10 minutes and 3 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
WalkthroughДобавлен новый пакет serverevents и мост Bridge, который по gRPC получает события удаления проектов и периодически синхронизирует состояние; реализована LRU-дедупликация, тесты и интеграция в запуск/остановку демона в Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant Bridge as Bridge
participant Server as engram Server
participant Dispatcher as Dispatcher
participant Module as Module
Client->>Bridge: daemon startup -> Start(ctx)
Bridge->>Server: ProjectEvents(stream) (with Bearer token)
alt stream delivers removal
Server-->>Bridge: PROJECT_EVENT_TYPE_REMOVED(projectID)
Bridge->>Bridge: фильтрация, LRU.dedup.Mark(...)
Bridge->>Dispatcher: DispatchProjectRemoval(projectID)
Dispatcher->>Module: OnProjectRemoved(projectID)
end
loop ~60s ± jitter
Bridge->>Dispatcher: ConnectedProjectIDs()
Bridge->>Server: SyncProjectState(local_ids)
Server-->>Bridge: removed project IDs
Bridge->>Bridge: дедупликация и fan-out как выше
end
Client->>Bridge: shutdown -> Stop()
Bridge->>Bridge: cancel goroutines, wait, close conn
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 минут Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a serverevents.Bridge to the engram daemon, enabling it to receive project removal notifications via a gRPC stream and a periodic heartbeat. The implementation includes an LRU cache for event deduplication and lifecycle management within the daemon. Reviewers identified a significant logic error where the heartbeat mechanism is ineffective because it relies on the stream to seed its tracked projects list. Additional feedback points to a gRPC connection leak, the need for localized random seeding to prevent heartbeat synchronization across instances, and a misleading test implementation that obscures the seeding issue.
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
…resource leak Addresses PR #171 review finding. The Bridge dialed its own gRPC.ClientConn in Start() but never closed it on Stop(), leaking the underlying TCP connection + goroutines. Now stores conn in Bridge struct and closes it after wg.Wait() in Stop(). Test-injected clients manage their own conn lifetime so conn is nil and the close is skipped.
…r heartbeat tracker source Resolves CRIT finding on PR #171: the previous implementation seeded a bridge-local sync.Map from ProjectEvents stream REMOVED messages, so projects that were only ever CONNECTED (never REMOVED) never entered the local set. SyncProjectState heartbeat calls therefore went out with an empty local_project_ids list and could not catch events missed during a stream drop — completely defeating the safety-net purpose of the heartbeat path (FR-9). Fix (Option A — dispatcher helper): 1. Expose Dispatcher.ConnectedProjectIDs() []string — returns a snapshot of all project IDs with an active session, built from the existing OnProjectConnect/Disconnect tracking state. The dispatcher now maintains a tracked sync.Map alongside the existing draining atomic.Bool; both are mutable fields but neither touches the hot HandleRequest path, preserving design decision D18. 2. Bridge accepts a ProjectTracker interface via NewBridge. Production passes the dispatcher; tests inject fakeTracker for deterministic seeding. Bridge no longer maintains its own tracked sync.Map. 3. Bridge.localProjectIDs() now delegates to tracker.ConnectedProjectIDs() instead of ranging over its own map. Empty list means no active sessions — still valid per spec. 4. Test suite updated: - fakeTracker implements ProjectTracker for injection - TestBridge_HeartbeatCatchesMissed rewritten to prove the end-to-end contract: seed tracker with a live project ID, server marks that ID as removed, bridge heartbeat fan-out fires. The fake server is enhanced with syncStrict mode that intersects incoming local_project_ids against a serverRemoved set — matching the real engram-server FR-6 semantics — and records every seen ID so the test can assert the bridge actually queried the tracker (syncSawProject helper). Design decision D18 doc comment revised from "dispatcher is stateless" to "no mutex in hot HandleRequest path". The existing draining atomic.Bool already set the precedent for mutable state outside the hot path; adding a lock-free sync.Map for session tracking continues that pattern.
…ck in Start CI failure on PR #171 (3 OSes, 3 tests): --- FAIL: TestBridge_HappyPath_StreamEvent --- FAIL: TestBridge_Reconnect --- FAIL: TestBridge_DedupAcrossStreamAndHeartbeat Root cause: Bridge.Start() checked serverURL first and returned early when the env var was unset — even when a bufconn-backed EventsClient was injected via NewBridge. On CI, ENGRAM_SERVER_URL is never set, so all tests that called bridge.Start(ctx) silently became no-ops and timed out waiting for events that no goroutine would ever deliver. TestBridge_HeartbeatCatchesMissed did not hit this path because it calls bridge.syncProjectState(ctx) directly and bypasses Start() entirely — that is why only the three Start-using tests failed and the new Option A regression test passed. Fix: invert the order of checks in Start(). A non-nil injected client unconditionally bypasses serverURL validation and is used as-is. Only the production path (client == nil) requires a configured server URL and dials its own connection. The no-op warning path still fires when both client and serverURL are absent. The bug was masked locally because my dev shell had ENGRAM_SERVER_URL set globally for engram client usage, so the early-return path never triggered during local test runs. CI runners start with a clean env. No behaviour change in production (client is always nil there).
…essor Covers the Option A accessor added in d9728e9 for the serverevents bridge heartbeat tracker source. Without these tests the new code path is only exercised transitively via the bridge heartbeat test, which leaves the dispatcher coverage gate at risk of dropping below 75% on this file. Four cases: - EmptyTracker: initial state returns empty slice (no sessions yet) - AfterConnect: OnProjectConnect populates the snapshot with 2 IDs - AfterDisconnect: OnProjectDisconnect removes the ID from subsequent snapshots, leaving the other session intact - Snapshot: the returned slice is a point-in-time snapshot — mutations to the underlying tracked map after the call do not affect it (proves the sync.Map.Range + append contract holds) All 4 tests pass in <1 ms (no goroutines, no I/O).
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
cmd/engram/main.go (1)
97-98:⚠️ Potential issue | 🔴 CriticalВ graceful-restart bridge не останавливается до
ShutdownAll.Сейчас в ветке
graceful-restartмодули начинают teardown, пока bridge всё ещё может фан-аутитьOnProjectRemoved. Это гонка по жизненному циклу и потенциальные вызовы в частично остановленные модули.💡 Предлагаемая правка
- go handleGracefulRestart(logger, pipeline, disp, filepath.Join(dd, "modules")) + go handleGracefulRestart(logger, pipeline, disp, sevBridge, filepath.Join(dd, "modules")) return "ACK" @@ func handleGracefulRestart( logger *slog.Logger, pipeline *lifecycle.Pipeline, disp *dispatcher.Dispatcher, + sevBridge *serverevents.Bridge, storageDir string, ) { @@ // Phase 2 — Snapshot: persist module state. if _, err := pipeline.SnapshotAll(ctx, storageDir, daemonVersion); err != nil { logger.Warn("SnapshotAll error (continuing)", "error", err) } + // Bridge must stop before module teardown. + if sevBridge != nil { + sevBridge.Stop() + } + // Phase 3 — Shutdown: clean module teardown. if err := pipeline.ShutdownAll(ctx); err != nil { logger.Warn("ShutdownAll error (continuing)", "error", err) }Also applies to: 180-206
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmd/engram/main.go` around lines 97 - 98, The graceful-restart flow currently starts module teardown while the bridge (dispatcher) can still fan-out OnProjectRemoved, causing a lifecycle race; update the flow so the dispatcher is quiesced before module ShutdownAll runs: modify handleGracefulRestart (and its caller site where it is invoked) to first stop or disable dispatching (e.g., call a dispatcher stop/quiesce method or set a shutdown-in-progress flag on disp and wait for in-flight events to drain) and only after the bridge is fully stopped begin calling pipeline.ShutdownAll / module teardown; reference handleGracefulRestart, ShutdownAll, disp/dispatcher, and OnProjectRemoved to locate and change the coordination ordering and add any necessary synchronization/wait for drains.
🧹 Nitpick comments (1)
internal/handlers/serverevents/bridge_test.go (1)
222-225: Комментарий и реализацияtestLoggerрасходятся.Сейчас логгер не “discard”, а пишет в
stderr; в параллельных тестах это добавляет шум. Можно направить вывод вio.Discard.💡 Предлагаемая правка
import ( "context" + "io" "log/slog" "net" "os" @@ func testLogger() *slog.Logger { - return slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})) + return slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelDebug})) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/handlers/serverevents/bridge_test.go` around lines 222 - 225, The testLogger function currently writes to os.Stderr causing noisy output in parallel tests; change testLogger to create the slog.Logger with a handler that writes to io.Discard instead of os.Stderr so it truly discards logs. Update the return in testLogger to use slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelDebug}) (import io if needed) so tests remain silent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/handlers/serverevents/bridge.go`:
- Around line 332-343: The heartbeat loop must skip empty project IDs to avoid
broadcasting OnProjectRemoved(""); add a guard that ignores empty strings before
calling b.dedup.Mark and b.fanOutRemoval (e.g., if projectID == "" { continue })
and optionally log a debug message, mirroring the guard already present in
handleEvent; ensure references to b.dedup.Mark, b.logger.*, and b.fanOutRemoval
remain unchanged.
- Around line 353-356: Метод Bridge.fanOutRemoval должен защищать вызовы
обработчиков от panic: в цикле, который вызывает
Registry.ForEachProjectRemovalAware /
module.ProjectRemovalAware.OnProjectRemoved, оберните вызов каждого
h.OnProjectRemoved(projectID) в защищённый блок с defer + recover и логированием
ошибки (включая информацию о projectID и типе/идентификаторе h) так чтобы один
паникнувший обработчик не прерывал фановыйрассылку для остальных обработчиков.
- Around line 191-249: runEventStream currently only increases backoff, so an
intermittent reconnect after a long stable connection still uses the large
backoff; modify consumeStream to return a second boolean (e.g., connected bool)
that you set true immediately after a successful ProjectEvents call (inside
consumeStream, right after b.client.ProjectEvents succeeds), change its
signature to (bool, error) and update the call in runEventStream to receive
(connected, err); then in runEventStream, if connected { backoff = backoffMin }
before logging/doing the sleep so the backoff is reset after any successful
stream open (use the existing backoffMin/backoffMax symbols and adjust error
handling accordingly).
In `@internal/handlers/serverevents/doc.go`:
- Around line 17-21: Документация ссылается на «bridge's own connection-aware
tracker» неявно; замените/уточните источник local_project_ids, явно указав, что
в текущей реализации это берётся из ProjectTracker (в проде через
dispatcher.ConnectedProjectIDs()), и обновите описание для
runSyncTicker/EngramService.SyncProjectState, упомянув конкретно вызов
dispatcher.ConnectedProjectIDs() как источник множества local_project_ids чтобы
документ соответствовал реальной реализации.
In `@internal/handlers/serverevents/url.go`:
- Around line 28-39: Change isHTTPS to parse the input URL (use url.Parse) and
return true only when the parsed URL's Scheme equals "https" (case-insensitive)
instead of using strings.HasPrefix; in safeURL always remove user info (u.User =
nil) before returning u.String(), and never return the raw input on parse
errors—return a non-sensitive placeholder (e.g. "<redacted-url>" or an empty
string) so secrets cannot leak in logs; update both isHTTPS and safeURL
accordingly.
- Around line 10-25: The function parseGRPCAddr should validate that the parsed
host is non-empty before building "host:port"; update parseGRPCAddr to check the
result of u.Hostname() (variable host) and return a clear error (e.g., "empty
host in serverURL") if host == "" so callers don’t receive addresses like
":37777" or ":443" and get confusing dial errors.
---
Outside diff comments:
In `@cmd/engram/main.go`:
- Around line 97-98: The graceful-restart flow currently starts module teardown
while the bridge (dispatcher) can still fan-out OnProjectRemoved, causing a
lifecycle race; update the flow so the dispatcher is quiesced before module
ShutdownAll runs: modify handleGracefulRestart (and its caller site where it is
invoked) to first stop or disable dispatching (e.g., call a dispatcher
stop/quiesce method or set a shutdown-in-progress flag on disp and wait for
in-flight events to drain) and only after the bridge is fully stopped begin
calling pipeline.ShutdownAll / module teardown; reference handleGracefulRestart,
ShutdownAll, disp/dispatcher, and OnProjectRemoved to locate and change the
coordination ordering and add any necessary synchronization/wait for drains.
---
Nitpick comments:
In `@internal/handlers/serverevents/bridge_test.go`:
- Around line 222-225: The testLogger function currently writes to os.Stderr
causing noisy output in parallel tests; change testLogger to create the
slog.Logger with a handler that writes to io.Discard instead of os.Stderr so it
truly discards logs. Update the return in testLogger to use
slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.LevelDebug})
(import io if needed) so tests remain silent.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 006a38e0-bd5d-4034-951b-65def53bd309
📒 Files selected for processing (9)
cmd/engram/main.gointernal/handlers/serverevents/bridge.gointernal/handlers/serverevents/bridge_test.gointernal/handlers/serverevents/client.gointernal/handlers/serverevents/dedup.gointernal/handlers/serverevents/dedup_test.gointernal/handlers/serverevents/doc.gointernal/handlers/serverevents/url.gointernal/module/dispatcher/dispatcher.go
… PR #171 Five orthogonal hardening fixes in the serverevents bridge, all surfaced by CodeRabbit on PR #171. Each finding was independently verified against the current code before applying the change. 1. bridge.go runEventStream — backoff reset after successful open. Previously backoff only grew: an early reconnect flap could inflate the window to 60 s, and any later single-disconnect after a long stable period would inherit that 60 s wait. consumeStream now returns a bool indicating whether the RPC call itself succeeded at least once; when opened == true, runEventStream resets backoff = backoffMin before the next sleep cycle. Signature change: consumeStream now returns (bool, error). 2. bridge.go syncProjectState — skip empty projectID in heartbeat. handleEvent already filtered empty IDs from the stream path, but the heartbeat loop passed them through unchanged. A malformed or partial server response that included an empty entry in resp.GetRemoved would have fanned out OnProjectRemoved("") to every module. Added the mirror guard: `if projectID == "" { continue }` before dedup + fan-out. 3. bridge.go fanOutRemoval — panic isolation per handler. Registry.ForEachProjectRemovalAware just iterates — a panic inside one module would crash the bridge goroutine and skip every later handler. Each h.OnProjectRemoved call now runs under defer+recover with an error log including projectID, handler type, and panic value. This matches the panic-isolation discipline the dispatcher already applies to lifecycle callbacks (FR-15). 4. url.go parseGRPCAddr — explicit host validation. A URL with no host silently returned ":37777" or ":443", which would then surface as an opaque dial error at a much later call site. Now returns a clear error "invalid server URL %q: missing host" so diagnostics point directly at the misconfiguration. 5. url.go isHTTPS + safeURL — scheme parsing + credential-safe fallback. isHTTPS previously used strings.HasPrefix(raw, "https") which would mis-classify inputs like "httpsfoo://bar" as HTTPS. Now parses the URL and checks u.Scheme with case-insensitive comparison. safeURL previously returned the raw input on parse error — a URL containing user:pass@host credentials would leak into structured logs if it failed to parse cleanly. Now returns "<invalid-url>" placeholder on parse failure so credentials cannot escape through the logging path. All 5 fixes verified locally: go build / go vet / gofmt / full test suite green under ENGRAM_SERVER_URL unset. No behavioural change for valid production inputs (valid HTTPS URL, non-empty host, well-formed removals).
… test Three small follow-ups to the MAJOR batch in 9f909f7: 1. doc.go — heartbeat path source clarification. The Architecture section used to say local_project_ids came from "the bridge own connection-aware tracker" which was accurate only for the obsolete Option C design. Now explicitly names the ProjectTracker interface and, in production, dispatcher.ConnectedProjectIDs() as the authoritative source. Tests inject a fake tracker. 2. bridge_test.go testLogger — switch stderr to io.Discard. Comment said "discard logger" but implementation wrote to os.Stderr, leaking debug output across parallel tests. Now truly discards via io.Discard. Removed obsolete "os" import in the same go. 3. bridge_test.go panicModule + TestBridge_FanOutRemoval_PanicIsolation. Regression guard for the fanOutRemoval defer+recover added in 9f909f7. Registers a panicking module BEFORE a surviving module so the test would fail if the bridge did not isolate the panic and continue fan-out. Verified the test fails without the recover (manually by reverting the recover block, test hangs on the survivor channel receive and times out after 1 s). All 3 pass in a single-pass go test run under io.Discard with ENGRAM_SERVER_URL unset.
Phase 6 release ceremony for the v4.4.0 train. Bumps the unified engram-server + plugin version after the 5-PR train completed: #167 — Phase B-1 plumbing tenant (loom integration) #168 — Phase 2 obs.MeterFor helper + loom OTel wiring #169 — Phase 3 4 loom_* tools + CLI worker with allowlist #170 — Phase 4 server-side gRPC proto extensions + soft-delete reaper #171 — Phase 5 daemon serverevents bridge for real OnProjectRemoved e2e Per Constitution §15 the daemon version and plugin version must move together, so this commit bumps: - cmd/engram/main.go daemonVersion "v4.3.0" → "v4.4.0" Reported to gRPC Initialize + logged in structured output. - plugin/engram/.claude-plugin/plugin.json version "4.3.0" → "4.4.0" Drives Claude Code plugin cache invalidation on /reload-plugins. Without this bump the marketplace update would not be detected. No behaviour change, no test change. Release notes + git tag + gh release + marketplace sync land in subsequent steps.
Summary
Phase 5 of the engram v4.4.0 loom integration train — the FINAL implementation PR. After this merges, only the v4.4.0 tag ceremony remains (no more code).
Adds the daemon-side
servereventsbridge that consumes engram-server's newProjectEventsgRPC stream (added in PR #170) and fans outOnProjectRemovedcallbacks to allProjectRemovalAwaremodules in real time, with a 60sSyncProjectStateheartbeat as the eventually-consistent backup.Implements spec FR-9 (serverevents bridge) + FR-10 (real OnProjectRemoved end-to-end) + US2 (P1 project removal e2e) + US4 (P2 daemon reconnects transparently) + NFR-3 (reconnect < 30s) + NFR-4 (heartbeat 60s ± 5s jitter) + NFR-9 (bridge stop within graceful-restart budget).
What ships
New package
internal/handlers/serverevents/(~1033 LOC prod + tests)doc.go(48 LOC) — package-level docs referencing plan.md §Phase 5 and proto-extensions.md; documents reconnect strategy + dedup policy.dedup.go(72 LOC) — custom 256-entry LRU keyed on(event_type, project_id).map + container/listimplementation.Mark(eventType, projectId) boolreturns true if the key was already seen. LRU eviction on insert when full.dedup_test.go(100 LOC) — 5 parallel tests: new key, duplicate key, eviction at capacity, least-recent evicted, concurrent Mark with 100 goroutines.client.go(47 LOC) — thinEventsClientinterface overpb.EngramServiceClientfor test injection. Production constructor wraps the real client; tests inject a fake.bridge.go(323 LOC) —Bridgestruct +Start(ctx, registry, conn)+Stop(). Two lifecycle goroutines:runEventStream— persistentProjectEventsstream with exponential backoff (1s → 60s cap). Dedup-checks every event before fan-out viaregistry.ForEachProjectRemovalAware.runSyncTicker— 60s ± 5s jitter ticker callingSyncProjectState. Collects local project IDs from bridge-maintained tracker, reconciles against server, dedup-checks everyremoved[]entry before fan-out.bridge_test.go(409 LOC) — 5 tests viabufconnfake server:TestBridge_HappyPath_StreamEvent— event firesOnProjectRemovedwithin 1sTestBridge_Reconnect— bridge backs off and reconnects after server dropTestBridge_DedupAcrossStreamAndHeartbeat— same event via both paths → fires exactly onceTestBridge_HeartbeatCatchesMissed— stream dead, heartbeat ticks,OnProjectRemovedfiresTestBridge_StopExitsCleanly—Stop()unsubscribes + cancels goroutines within NFR-9 budget (5s)url.go(34 LOC) — helper for URL parsing of the engram-server endpoint (shared with dial logic).Wiring
cmd/engram/main.go(+16) — bridge started afterpipeline.Startsucceeds and beforeeng.Runblocking call. On shutdown,Bridge.Stop()fires beforepipeline.ShutdownAllso in-flight event fan-outs complete before modules tear down.Key architectural decisions (per plan.md §Phase 5 + implementation exploration)
Tracked-projects source — Option C (bridge-maintained
sync.Map)Considered three options for where
SyncProjectStategets its local-project-IDs snapshot:TrackedProjectIDs() []stringto themodule.ProjectLifecycleinterface. REJECTED: the interface is frozen as of Phase A (v4.3.0) per the semver discipline; changing it would require a Phase A spec amendment and would ripple through existing mocks.OnProjectConnect/OnProjectDisconnecthooks. REJECTED: the dispatcher exposes no such subscription surface short of making the bridge a fullEngramModule, and making it a module would conflate lifecycle concerns.sync.Mapseeded by incoming stream events. Safe trade-off: a project the server removes before the daemon ever saw any event → the fan-out hits no handlers, which is a no-op. The heartbeat path catches anything the stream misses.Documented in the T038 commit and in the package doc comment.
Daemon-instance ID
fmt.Sprintf("%d-%d", os.Getpid(), time.Now().Unix())— built once inNewBridge()and cached. Matches the format suggested in proto-extensions.md §"SyncProjectStateRequest.client_id".gRPC connection — fresh dial (NOT shared with engramcore pool)
engramcore's gRPC pool (added in PR #167) uses a
UnaryClientInterceptorfor the auth token. The serverevents bridge needs streaming (forProjectEvents) and unary (forSyncProjectState) RPCs. The server's auth interceptor is unary-only — sharing engramcore's pool would require adding aStreamInterceptorand changing pool-level behaviour, risking engramcore regression.Resolution: the bridge does a fresh
grpc.Dialto the same server URL (parsed viaurl.go) and injects the token viametadata.AppendToOutgoingContextinoutgoingContext()for every call. The auth bytes on the wire are identical to what engramcore sends — just attached via metadata instead of an interceptor.This keeps the two tenants' gRPC lifecycles independent: if the bridge connection flaps, engramcore keeps working.
Dedup key normalization
Both the stream path and the heartbeat path use the string
"removed"as the event-type component of the dedup LRU key. The stream carries a proto enum value (ProjectEventType.PROJECT_EVENT_TYPE_REMOVED); the heartbeat has no such enum (it seesremoved[]as a flat list of project IDs). Normalizing to"removed"ensures cross-path dedup works even though the two code paths see different value types.Verification
go build ./...— cleango vet ./...— cleango test ./internal/handlers/serverevents/... -count=1— 10/10 tests PASS (1.09s)go test ./... -count=1 -timeout 300s— all 45 packages PASS, zero regressionsAnti-stub check: replacing
runEventStreambody withreturnwould failTestBridge_HappyPath_StreamEvent. Replacingdedup.Markbody withreturn falsewould failTestBridge_DedupAcrossStreamAndHeartbeat.Compatibility
ProjectRemovalAware.OnProjectRemovedis what the bridge fans out to7 atomic commits
(Exact commit hashes will be in the merge squash commit after review.)
feat(serverevents): package skeleton + doc.go(T045)feat(serverevents): 256-entry LRU dedup cache(T039 + T040)feat(serverevents): EventsClient interface for test injection(T041)feat(serverevents): Bridge struct + runEventStream + runSyncTicker(T038 + T042 + T043)test(serverevents): url helper(supporting helper)test(serverevents): 5 bridge tests via bufconn fake server(T044)feat(engram): wire serverevents Bridge after pipeline.Start(T046)Related
.agent/specs/loom-integration/spec.mdFR-9 + FR-10 + US2 + US4 + NFR-3/4/9 (gitignored).agent/specs/loom-integration/plan.md§Phase 5 (gitignored).agent/specs/loom-integration/tasks.mdT038-T046 + G005 (gitignored)f011e22)b3dc577)ff57fec)d324f01)gh release create, sync marketplaceSummary by CodeRabbit
Новые возможности
Тесты
Улучшения