Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/calm-cups-reflect.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"agents": patch
"@cloudflare/ai-chat": patch
---

Avoid throwing when chat stream resume/replay races with a closed WebSocket connection.
58 changes: 58 additions & 0 deletions .github/workflows/semgrep.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
name: Semgrep OSS scan
on:
pull_request: {}
push:
branches: [main, master]
workflow_dispatch: {}
schedule:
# Run at midnight UTC on Saturdays. The `gate` job below filters
# scheduled runs to the first Saturday of each month. We avoid
# encoding "first Saturday" in cron itself because POSIX cron applies
# OR semantics when both day-of-month and day-of-week are
# non-wildcard, so e.g. `0 0 1-7 * 6` would fire every day 1-7 *and*
# every Saturday, not just the first Saturday.
- cron: "0 0 * * 6"
concurrency:
group: semgrep-${{ github.event_name }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
permissions:
contents: read
jobs:
gate:
name: gate
runs-on: ubuntu-slim
outputs:
run: ${{ steps.check.outputs.run }}
steps:
- id: check
shell: bash
run: |
if [ "${{ github.event_name }}" != "schedule" ]; then
echo "run=true" >> "$GITHUB_OUTPUT"
exit 0
fi
DAY=$(date -u +%-d)
if [ "$DAY" -le 7 ]; then
echo "run=true" >> "$GITHUB_OUTPUT"
else
echo "run=false" >> "$GITHUB_OUTPUT"
echo "Skipping scheduled run: day-of-month $DAY is not in the first 7 days."
fi
semgrep:
name: semgrep-oss
needs: [gate]
if: needs.gate.outputs.run == 'true'
runs-on: ubuntu-slim
steps:
- uses: actions/checkout@v6
with:
fetch-depth: 1
- id: cache-semgrep
uses: actions/cache@v5
with:
path: ~/.local
key: semgrep-1.160.0-${{ runner.os }}
- if: steps.cache-semgrep.outputs.cache-hit != 'true'
run: pip install --user semgrep==1.160.0
- run: echo "$HOME/.local/bin" >> "$GITHUB_PATH"
- run: semgrep scan --config=auto
87 changes: 63 additions & 24 deletions packages/agents/src/chat/resumable-stream.ts
Comment thread
whoiskatrin marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,23 @@ const CLEANUP_AGE_THRESHOLD_MS = 24 * 60 * 60 * 1000;
/** Shared encoder for UTF-8 byte length measurement */
const textEncoder = new TextEncoder();

function sendIfOpen(connection: Connection, message: string): boolean {
try {
connection.send(message);
return true;
} catch (error) {
if (isWebSocketClosedSendError(error)) return false;
throw error;
}
}

function isWebSocketClosedSendError(error: unknown): boolean {
return (
error instanceof TypeError &&
error.message.includes("WebSocket send() after close")
);
}

/**
* Stored stream chunk for resumable streaming
*/
Expand Down Expand Up @@ -277,6 +294,10 @@ export class ResumableStream {
* reconstruct and persist the partial message from the stored chunks.
* - **Completed during replay** (defensive): sends chunks + `done`.
*
* All sends use {@link sendIfOpen}, so a WebSocket closing mid-replay
* does not throw. If the connection drops while iterating chunks the
* stream is left active so the next reconnect can retry.
*
* @param connection - The WebSocket connection
* @param requestId - The original request ID
* @returns The stream ID if the stream was orphaned and finalized, null otherwise.
Expand All @@ -295,22 +316,30 @@ export class ResumableStream {
`;

for (const chunk of chunks || []) {
connection.send(
JSON.stringify({
body: chunk.body,
done: false,
id: requestId,
type: CHAT_MESSAGE_TYPES.USE_CHAT_RESPONSE,
replay: true
})
);
if (
!sendIfOpen(
connection,
JSON.stringify({
body: chunk.body,
done: false,
id: requestId,
type: CHAT_MESSAGE_TYPES.USE_CHAT_RESPONSE,
replay: true
})
)
) {
// Connection closed mid-replay — leave the stream active so the
// next reconnect can retry from the start.
return null;
}
}

if (this._activeStreamId !== streamId) {
// Stream completed between our check above and now — send done.
// In practice this cannot happen (DO is single-threaded and replay is
// synchronous), but we guard defensively in case the flow changes.
connection.send(
sendIfOpen(
connection,
JSON.stringify({
body: "",
done: true,
Expand All @@ -325,8 +354,12 @@ export class ResumableStream {
if (!this._isLive) {
// Orphaned stream — restored from SQLite after hibernation but the
// LLM ReadableStream reader was lost. No more live chunks will ever
// arrive, so finalize it: send done and mark completed in SQLite.
connection.send(
// arrive, so finalize it: best-effort send done, then mark completed
// in SQLite. The orphan-cleanup decision is committed regardless of
// whether this particular connection received the done frame, so the
// caller can persist the reconstructed message.
sendIfOpen(
connection,
JSON.stringify({
body: "",
done: true,
Expand All @@ -343,7 +376,8 @@ export class ResumableStream {
// complete so the client can flush accumulated parts to React state.
// Without this, replayed chunks sit in activeStreamRef unflushed
// until the next live chunk arrives.
connection.send(
sendIfOpen(
connection,
JSON.stringify({
body: "",
done: false,
Expand Down Expand Up @@ -379,18 +413,24 @@ export class ResumableStream {
`;

for (const chunk of chunks || []) {
connection.send(
JSON.stringify({
body: chunk.body,
done: false,
id: requestId,
type: CHAT_MESSAGE_TYPES.USE_CHAT_RESPONSE,
replay: true
})
);
if (
!sendIfOpen(
connection,
JSON.stringify({
body: chunk.body,
done: false,
id: requestId,
type: CHAT_MESSAGE_TYPES.USE_CHAT_RESPONSE,
replay: true
})
)
) {
return false;
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Comment thread
whoiskatrin marked this conversation as resolved.
}
}

connection.send(
return sendIfOpen(
connection,
JSON.stringify({
body: "",
done: true,
Expand All @@ -399,7 +439,6 @@ export class ResumableStream {
replay: true
})
);
return true;
}

// ── Restore / cleanup ──────────────────────────────────────────────
Expand Down
20 changes: 19 additions & 1 deletion packages/ai-chat/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,23 @@ export type {
SaveMessagesResult
} from "agents/chat";

function sendIfOpen(connection: Connection, message: string): boolean {
try {
connection.send(message);
return true;
} catch (error) {
if (isWebSocketClosedSendError(error)) return false;
throw error;
}
}

function isWebSocketClosedSendError(error: unknown): boolean {
return (
error instanceof TypeError &&
error.message.includes("WebSocket send() after close")
);
}

export type ChatMessage = UIMessage;

const TIMED_OUT = Symbol("timed-out");
Expand Down Expand Up @@ -881,7 +898,8 @@ export class AIChatAgent<
data.id
)
) {
connection.send(
sendIfOpen(
connection,
JSON.stringify({
body: "",
done: true,
Expand Down
Loading