Add durable Think submissions API#1511
Conversation
Introduce submitMessages() for Think so RPC callers can durably accept a programmatic chat turn without waiting for model execution to finish. The new API persists a submission row before execution, supports idempotent retries through idempotencyKey, exposes inspection/list/cancel/delete helpers, and emits submission lifecycle observability events. The implementation uses cf_think_submissions as a SQLite-backed ledger and an idempotent scheduled drain as the wakeup mechanism. Submissions move through a small pending/running/terminal state machine, append messages to Session only after being claimed, and use messages_applied_at as the replay boundary so hibernation recovery never duplicates already-applied messages. Pending submissions are synchronously skipped during turn reset, terminal states are protected with conditional updates, and recovered chat continuations stay running until they reach a terminal outcome. Add focused coverage for fast acceptance, idempotent retries, FIFO draining, cancellation, reset races, startup recovery, chat recovery, malformed durable rows, cleanup filters, and recovered-continuation cancellation. Also add user docs, a contributor-facing design doc, a changeset, observability tests, and a dedicated think-submissions example that demonstrates durable submission, retry, status inspection, cancellation, and cleanup flows. Co-authored-by: Cursor <cursoragent@cursor.com>
🦋 Changeset detectedLatest commit: ae9a7f6 The changes in this PR will be included in the next version bump. This PR includes changesets to release 1 package
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
| doneSent = true; | ||
| } catch (error) { | ||
| streamError = error instanceof Error ? error.message : "Stream error"; | ||
| this._programmaticStreamErrors.set(requestId, streamError); |
There was a problem hiding this comment.
🟡 Memory leak: _programmaticStreamErrors entries never cleaned up for non-submission callers of _streamResult
_streamResult sets this._programmaticStreamErrors.set(requestId, streamError) in its catch block (line 3886), but this map is only cleaned up inside _runSubmission's finally block (line 2817). _streamResult is called from 5 different paths — submissions, saveMessages, continueLastTurn, WebSocket chat requests, and auto-continuation — but only the submission path removes entries. For the other 4 callers, each stream error adds a (UUID, errorMessage) entry that is never deleted, accumulating over the DO isolate's lifetime.
Prompt for agents
The _programmaticStreamErrors map is written to inside _streamResult (a shared method called by all chat paths) but only cleaned up inside _runSubmission (the submission-specific path). This means entries accumulate forever for non-submission stream errors (saveMessages, continueLastTurn, WebSocket chat, auto-continuation).
Approach 1: Instead of setting the error in the shared _streamResult method, capture the stream error at the _runSubmission call site. _runSubmission already has access to the result object. You could have _streamResult return or throw the error information, and let _runSubmission capture it locally without using a shared map at all.
Approach 2: If the shared map approach is retained, clean up entries at every call site that invokes _streamResult, not just _runSubmission. This would mean adding cleanup in _runProgrammaticMessagesTurn, the WebSocket chat handler, and the auto-continuation path.
Approach 1 is cleaner since it avoids the shared mutable state entirely.
Was this helpful? React with 👍 or 👎 to provide feedback.
| <head> | ||
| <meta charset="UTF-8" /> | ||
| <meta name="viewport" content="width=device-width, initial-scale=1.0" /> | ||
| <link rel="icon" href="/favicon.svg" /> |
There was a problem hiding this comment.
🟡 Example uses favicon.svg instead of required favicon.ico per examples/AGENTS.md
The examples/AGENTS.md required structure specifies public/favicon.ico and the index.html template uses <link rel="icon" href="/favicon.ico" />. The new think-submissions example uses favicon.svg (examples/think-submissions/index.html:6) and has a public/favicon.svg file instead. This deviates from the documented convention that the majority of examples (25 out of 28) follow.
| <link rel="icon" href="/favicon.svg" /> | |
| <link rel="icon" href="/favicon.ico" /> |
Was this helpful? React with 👍 or 👎 to provide feedback.
agents
@cloudflare/ai-chat
@cloudflare/codemode
hono-agents
@cloudflare/shell
@cloudflare/think
@cloudflare/voice
@cloudflare/worker-bundler
commit: |
Only durable submissions need to capture stream errors in the programmatic stream error map so their terminal row can be marked as error after _streamResult returns. Other callers such as saveMessages, WebSocket chat turns, continuations, and auto-continuation already handle stream errors through the normal response hook path and should not leave request-scoped entries behind for the isolate lifetime. Scope _programmaticStreamErrors writes behind an explicit capture option used by _runSubmission, and add regression coverage that a non-submission stream failure does not retain an entry. Also align the think-submissions example with the examples directory convention by using public/favicon.ico from the standard example favicon instead of a custom favicon.svg. Co-authored-by: Cursor <cursoragent@cursor.com>
A captured programmatic stream error should only turn an otherwise completed submission into an error. If the underlying programmatic turn reports aborted or skipped, those explicit terminal outcomes must win even when abort/reset also surfaced as a stream iterator error. Factor submission final status selection into a helper, clear error_message for non-error terminal states, and add regression coverage for completed+error, aborted+error, and skipped+error precedence. Co-authored-by: Cursor <cursoragent@cursor.com>
Keep the stale-evidence safety net for recovered durable submissions, but expose the recovery freshness window as a protected static setting that Think subclasses can tune for legitimate long-running turns. The default remains 15 minutes, preserving the existing behavior for normal agents while avoiding a hardcoded limit for providers or workloads that can validly run longer before a Durable Object restart. Update the recovery check to read submissionRecoveryStaleMs from the concrete subclass, document the override point in the durable submissions design doc, and add coverage proving that an older recoverable fiber remains running when a subclass extends the stale window. Co-authored-by: Cursor <cursoragent@cursor.com>
Summary
Adds a durable asynchronous submission API to
@cloudflare/thinkso programmatic callers can enqueue a Think turn, get a durable acceptance response quickly, retry safely, and inspect or cancel the turn later.This PR introduces:
submitMessages(messages, { submissionId, idempotencyKey, metadata })for durable programmatic turn submission.pending,running,completed,aborted,skipped, anderror.submission:create,submission:status, andsubmission:error.examples/think-submissionsdemo app.Why
saveMessages()is intentionally blocking: it injects messages and waits for the model turn to finish. That works for in-process callers, but it creates timeout ambiguity for RPC callers, webhooks, and external jobs. If the caller times out, it cannot tell whether the turn was never accepted, is queued, is running, or already completed. Retrying can duplicate messages or run the same external job twice.submitMessages()creates a durable acceptance boundary instead: the caller gets back a persisted submission record before inference starts, and can retry byidempotencyKeywithout appending duplicate messages.Addresses the core problem in #1479.
Design
The implementation is Think-specific rather than a base
Agentprimitive because the hard parts are chat semantics:UIMessage[]The durable source of truth is a new
cf_think_submissionsSQLite table. Rows are drained FIFO with an idempotent scheduled wakeup, and indexed for queue, request lookup, and terminal cleanup paths.Important invariants:
submitMessages()returnsaccepted: true.pendingtorunning.messages_applied_atis set only after Session append succeeds, and acts as the replay safety boundary.runningsubmissions, but never replays if any submitted message is already present in Session.skippedduring turn reset so they cannot be claimed after reset.abortedorskipped.AbortSignalis intentionally not part ofsubmitMessages(); durable cancellation goes throughcancelSubmission(submissionId).Docs And Example
This PR adds:
docs/think/programmatic-submissions.mdfor user-facing API guidance.design/think-durable-submissions.mdfor contributor-facing rationale, state transitions, failure boundaries, recovery rules, and operational guidance.submitMessages()fromsaveMessages()and workflows.examples/think-submissions, a focused demo of durable submission, idempotent retry, status inspection, cancellation, cleanup, and chat status visualization.Test Coverage
Adds
packages/think/src/tests/submissions.test.tswith coverage for:submissionId/idempotencyKeypairsAlso extends observability tests for
submission:*event routing.Validation
Ran the following successfully:
npm run test -w @cloudflare/think -- src/tests/submissions.test.ts— 29 passednpm run test -w @cloudflare/think— 325 passednpm run build -w @cloudflare/thinknpm run check— exports, formatting, oxlint, and typecheck across 82 projectsnpm run test— full workspace test suitenpx vite buildinexamples/think-submissionsMade with Cursor