From 6b3e1eb9e701dcfa17337e15c786796a740f32df Mon Sep 17 00:00:00 2001 From: MarioCadenas Date: Mon, 27 Apr 2026 15:34:39 +0200 Subject: [PATCH] fix(appkit): bind SSE streams to creator and reject cross-user reconnects The StreamManager registry was a global lookup keyed only by streamId. Any request that supplied a known or guessable streamId could attach to an existing stream via _attachToExistingStream and receive its events, because there was no authorization step on reconnection. In the Genie plugin this was directly exposed: requestId comes from the client query string and is passed through as streamId, so guessing or replaying another user's requestId leaked their conversation events. Fix: thread the calling principal's user key (resolved by Plugin.executeStream the same way it's used for cache scoping) through to the stream manager and store it on the stream entry as ownerKey. On reconnection, only attach when the requesting caller's owner key matches the stream's owner key; otherwise emit a STREAM_FORBIDDEN error and end the connection. This applies to every plugin that uses executeStream, including Genie and Analytics, with no per-plugin changes required. Signed-off-by: MarioCadenas --- packages/appkit/src/plugin/plugin.ts | 12 +- .../appkit/src/plugin/tests/plugin.test.ts | 4 + packages/appkit/src/stream/stream-manager.ts | 20 ++- .../appkit/src/stream/tests/stream.test.ts | 115 ++++++++++++++++++ packages/appkit/src/stream/types.ts | 7 ++ 5 files changed, 154 insertions(+), 4 deletions(-) diff --git a/packages/appkit/src/plugin/plugin.ts b/packages/appkit/src/plugin/plugin.ts index 5173cb612..ed3bc3b5c 100644 --- a/packages/appkit/src/plugin/plugin.ts +++ b/packages/appkit/src/plugin/plugin.ts @@ -443,8 +443,16 @@ export abstract class Plugin< } }; - // stream the result to the client - await this.streamManager.stream(res, asyncWrapperFn, streamConfig); + // stream the result to the client. The effective user key is forwarded + // to the stream manager so that reconnections to existing streamIds are + // bound to the original creator (prevents cross-user stream takeover via + // guessed/leaked IDs). + await this.streamManager.stream( + res, + asyncWrapperFn, + streamConfig, + effectiveUserKey, + ); } /** diff --git a/packages/appkit/src/plugin/tests/plugin.test.ts b/packages/appkit/src/plugin/tests/plugin.test.ts index 440579d79..2ea5623c7 100644 --- a/packages/appkit/src/plugin/tests/plugin.test.ts +++ b/packages/appkit/src/plugin/tests/plugin.test.ts @@ -295,6 +295,10 @@ describe("Plugin", () => { mockResponse, expect.any(Function), {}, + // The plugin forwards the resolved user key as the 4th argument to + // bind the stream to its creator. The test passes `false` as an + // explicit override, which propagates through `userKey ?? getCurrentUserId()`. + false, ); }); diff --git a/packages/appkit/src/stream/stream-manager.ts b/packages/appkit/src/stream/stream-manager.ts index 8b511fac8..997794e6d 100644 --- a/packages/appkit/src/stream/stream-manager.ts +++ b/packages/appkit/src/stream/stream-manager.ts @@ -31,6 +31,7 @@ export class StreamManager { res: IAppResponse, handler: (signal: AbortSignal) => AsyncGenerator, options?: StreamConfig, + ownerKey?: string, ): Promise { const { streamId } = options || {}; @@ -45,14 +46,27 @@ export class StreamManager { // handle reconnection if (streamId && StreamValidator.validateStreamId(streamId)) { const existingStream = this.streamRegistry.get(streamId); - // if stream exists, attach to it if (existingStream) { + // Enforce per-user binding: the stream's owner key must match the + // requesting caller's owner key. This prevents cross-user stream + // takeover via guessed/leaked stream IDs (the SSE registry was + // previously a global lookup with no authorization step). + if (existingStream.ownerKey !== ownerKey) { + this.sseWriter.writeError( + res, + randomUUID(), + "Stream not found or access denied", + SSEErrorCode.STREAM_FORBIDDEN, + ); + res.end(); + return; + } return this._attachToExistingStream(res, existingStream, options); } } // if stream does not exist, create a new one - return this._createNewStream(res, handler, options); + return this._createNewStream(res, handler, options, ownerKey); } // abort all active operations @@ -143,6 +157,7 @@ export class StreamManager { res: IAppResponse, handler: (signal: AbortSignal) => AsyncGenerator, options?: StreamConfig, + ownerKey?: string, ): Promise { const streamId = options?.streamId ?? randomUUID(); @@ -177,6 +192,7 @@ export class StreamManager { // create stream entry const streamEntry: StreamEntry = { streamId, + ownerKey, generator: handler(combinedSignal), eventBuffer, clients: new Set([res]), diff --git a/packages/appkit/src/stream/tests/stream.test.ts b/packages/appkit/src/stream/tests/stream.test.ts index fae54289c..09c6716de 100644 --- a/packages/appkit/src/stream/tests/stream.test.ts +++ b/packages/appkit/src/stream/tests/stream.test.ts @@ -507,6 +507,121 @@ describe("StreamManager", () => { expect(hasNewStream).toBe(false); }); + test("rejects reconnect from a different owner", async () => { + const streamId = "owner-bound-123"; + + const { mockRes: mockRes1 } = createMockResponse(); + + async function* generator1() { + for (let i = 0; i < 5; i++) { + yield { type: "message", data: `secret-${i}` }; + } + } + + await streamManager.stream( + mockRes1 as any, + generator1, + { streamId }, + "user-alice", + ); + + const { mockRes: mockRes2, events: events2 } = createMockResponse(); + + async function* generator2() { + yield { type: "should-not-run" }; + } + + await streamManager.stream( + mockRes2 as any, + generator2, + { streamId }, + "user-bob", + ); + + // Bob must not see any of Alice's events or replays. + expect(events2.some((e) => e.includes("secret-"))).toBe(false); + expect(events2.some((e) => e.includes("should-not-run"))).toBe(false); + + // A STREAM_FORBIDDEN error must be emitted and the connection ended. + expect(events2.some((e) => e.includes("STREAM_FORBIDDEN"))).toBe(true); + expect(mockRes2.end).toHaveBeenCalled(); + }); + + test("allows reconnect from the same owner", async () => { + const streamId = "owner-bound-456"; + + const { mockRes: mockRes1, events: events1 } = createMockResponse(); + + async function* generator1() { + yield { type: "message", data: "event-1" }; + yield { type: "message", data: "event-2" }; + yield { type: "message", data: "event-3" }; + } + + await streamManager.stream( + mockRes1 as any, + generator1, + { streamId }, + "user-alice", + ); + + const eventIds = events1 + .filter((e) => e.startsWith("id: ")) + .map((e) => e.replace("id: ", "").replace("\n", "")); + + const { mockRes: mockRes2, events: events2 } = createMockResponse({ + "last-event-id": eventIds[1], + }); + + async function* generator2() { + yield { type: "should-not-run" }; + } + + await streamManager.stream( + mockRes2 as any, + generator2, + { streamId }, + "user-alice", + ); + + const replayedData = events2 + .filter((e) => e.startsWith("data: ")) + .map((e) => e.replace("data: ", "").replace("\n\n", "")); + expect(replayedData.length).toBe(1); + expect(replayedData[0]).toContain("event-3"); + expect(events2.some((e) => e.includes("STREAM_FORBIDDEN"))).toBe(false); + }); + + test("treats a missing owner as a distinct identity from a named owner", async () => { + const streamId = "owner-bound-789"; + + const { mockRes: mockRes1 } = createMockResponse(); + + async function* generator1() { + yield { type: "message", data: "scoped" }; + } + + await streamManager.stream( + mockRes1 as any, + generator1, + { streamId }, + "user-alice", + ); + + const { mockRes: mockRes2, events: events2 } = createMockResponse(); + + async function* generator2() { + yield { type: "should-not-run" }; + } + + // Caller without an owner key must not attach to a stream that + // was created with one. + await streamManager.stream(mockRes2 as any, generator2, { streamId }); + + expect(events2.some((e) => e.includes("scoped"))).toBe(false); + expect(events2.some((e) => e.includes("STREAM_FORBIDDEN"))).toBe(true); + }); + test("should replay successfully when within buffer capacity", async () => { const streamId = "no-overflow-test-456"; diff --git a/packages/appkit/src/stream/types.ts b/packages/appkit/src/stream/types.ts index 3841bfd18..bb6f65f6e 100644 --- a/packages/appkit/src/stream/types.ts +++ b/packages/appkit/src/stream/types.ts @@ -16,6 +16,7 @@ export const SSEErrorCode = { INVALID_REQUEST: "INVALID_REQUEST", STREAM_ABORTED: "STREAM_ABORTED", STREAM_EVICTED: "STREAM_EVICTED", + STREAM_FORBIDDEN: "STREAM_FORBIDDEN", UPSTREAM_ERROR: "UPSTREAM_ERROR", } as const satisfies Record; @@ -35,6 +36,12 @@ export interface BufferedEvent { export interface StreamEntry { streamId: string; + /** + * Identifier of the principal that created the stream (e.g. end-user ID + * or service principal user ID). When set, only requests sharing the + * same owner key may reconnect to the stream. + */ + ownerKey?: string; generator: AsyncGenerator; eventBuffer: EventRingBuffer; clients: Set;