Wire encryption into serialization and fix EventsConsumer for async replay#957
Wire encryption into serialization and fix EventsConsumer for async replay#957TooTallNate wants to merge 15 commits intofix/async-deserialization-orderingfrom
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
🦋 Changeset detectedLatest commit: f84db02 The changes in this PR will be included in the next version bump. This PR includes changesets to release 15 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
🧪 E2E Test Results❌ Some tests failed Summary
❌ Failed Tests🌍 Community Worlds (46 failed)mongodb (1 failed):
turso (45 failed):
Details by Category✅ ▲ Vercel Production
✅ 💻 Local Development
✅ 📦 Local Production
✅ 🐘 Local Postgres
✅ 🪟 Windows
❌ 🌍 Community Worlds
✅ 📋 Other
|
There was a problem hiding this comment.
Pull request overview
This PR implements end-to-end encryption for workflow user data by wiring encryption functionality into the serialization layer. It builds on previous PRs that generated runId client-side (#954), made serialization functions async (#955), and added the Vercel encryption implementation (#956).
Changes:
- Adds encryption/decryption helper functions (
maybeEncrypt,maybeDecrypt,isEncrypted,peekFormatPrefix) and unused stream encryption utilities (getEncryptStream,getDecryptStream) - Adds
ENCRYPTED('encr') format prefix toSerializationFormatenum - Wires encryption into all 8 dehydrate/hydrate functions by calling
maybeEncryptafter serialization andmaybeDecryptbefore deserialization - Implements inline stream encryption in
WorkflowServerWritableStreamandWorkflowServerReadableStream - Passes
runIdtoWorkflowServerReadableStreamfor decryption context - Adds 8 comprehensive integration tests using a mock XOR encryptor
- Removes unused
_prefixes fromencryptorandrunIdparameters (now actively used) - Adds type casts (
as any[],as any,as TResult) to hydration call sites to preserve type information
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| packages/core/src/serialization.ts | Core encryption implementation: adds format prefix, helper functions, encryption wiring in dehydrate/hydrate functions, and inline stream encryption/decryption |
| packages/core/src/workflow.ts | Adds type cast for hydrated workflow arguments |
| packages/core/src/runtime/step-handler.ts | Adds type cast for hydrated step arguments |
| packages/core/src/runtime/run.ts | Adds type cast for hydrated workflow return value |
| packages/core/src/serialization.test.ts | Adds 8 encryption integration tests with mock XOR encryptor |
| .changeset/e2e-encryption.md | Documents the end-to-end encryption feature addition |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
efd031c to
c1795c7
Compare
7e25694 to
e5b148e
Compare
c1795c7 to
60fac27
Compare
e5b148e to
c686cb4
Compare
f86190e to
3201957
Compare
a815366 to
ded4ecb
Compare
pranaygp
left a comment
There was a problem hiding this comment.
Review: PR #957 - Wire encryption into serialization layer
Summary: This is the "light the fuse" PR that actually enables encryption. It wires maybeEncrypt/maybeDecrypt into all 8 dehydrate/hydrate functions, adds stream encryption/decryption support, and adds the ENCRYPTED format prefix.
Strengths
-
Opt-in design: Encryption is entirely opt-in. When no
Encryptoris provided (or it has noencrypt/decryptmethods), data passes through unchanged. Existing unencrypted workflows continue to work. -
Backwards compatibility:
maybeDecryptchecks for theencrprefix before attempting decryption, so encrypted and unencrypted data can coexist. Old runs withdevlprefix are handled correctly. The test "should handle unencrypted data when encryptor is provided" explicitly validates this. -
Stream encryption: Both
WorkflowServerWritableStream(encrypt on write) andWorkflowServerReadableStream(decrypt on read) are updated. ThegetEncryptStream/getDecryptStreamtransform streams are a clean abstraction. -
Error messages: Good error messages when encrypted data is encountered but no decryptor is available, pointing users to set
VERCEL_DEPLOYMENT_KEY.
Concerns
-
dehydrateWorkflowReturnValuelostv1Compatparameter: In the diff,dehydrateWorkflowReturnValueno longer acceptsv1Compat. Theif (v1Compat) { return revive(str); }branch is removed. This means legacy specVersion 1 workflows can no longer serialize return values. Is this intentional? If so, this is a breaking change that should be documented. Currently the caller inworkflow.tsdoesn't passv1Compatfor return values, but any external callers would be affected. -
Stream encryption uses
world.encryptdirectly: InWorkflowServerWritableStream, the encryption callsworld.encrypt!(chunk, { runId }). But theworldis obtained fromgetWorld()at construction time. This means stream encryption always uses the current deployment's encryptor, not a resolved per-run encryptor. For cross-deployment scenarios (reading streams from a different deployment), this could be an issue. Similarly,WorkflowServerReadableStreamcallsworld.decryptdirectly. Consider whether stream encryption should also go throughresolveEncryptorForRun. -
Removed runId guards in step reducers: The PR removes the
if (!runId)throw guards fromgetStepReducersfor ReadableStream and WritableStream serialization. While these were likely never hit in practice (runId is always available during step execution), removing them silently means any future bugs would produce confusing errors downstream instead of a clear error message at the serialization boundary. -
Type assertions proliferation: Several callers now need
as TResult,as any,as any[]casts after the hydrate calls because the return types changed toPromise<unknown>. Consider updating the hydrate function signatures to use generics, e.g.:async function hydrateWorkflowReturnValue<T = unknown>(...): Promise<T>
This would reduce the cast noise at call sites.
-
Changeset scope: The changeset includes
@workflow/web-sharedand@workflow/world-testingbut the changes in those packages are just updating call sites to pass the encryptor parameter. Consider whether these packages' users need to know about this change or if the changeset entries for them add noise.
Test Coverage
The 8 new encryption integration tests using the XOR mock encryptor are good:
- Encrypt/decrypt round-trip for workflow args, step args, step return values, workflow return values
- Wrong runId produces decryption failure
- No encryptor = no encryption (devl prefix preserved)
- Encrypted data works with encryptor on hydrate
- Unencrypted data works with encryptor on hydrate
Missing test coverage:
- Stream encryption/decryption (getEncryptStream/getDecryptStream and the Writable/Readable stream integration)
- Error case: encrypted data encountered but no decrypt function available
- Mixed encrypted/unencrypted chunks in the same stream
Overall
The encryption wiring is well-designed with clean opt-in semantics and good backwards compatibility. The main concerns are the v1Compat removal (potential breaking change), the direct world.encrypt/world.decrypt usage in streams (vs per-run encryptor resolution), and missing stream encryption test coverage.
pranaygp
left a comment
There was a problem hiding this comment.
Review: Wire encryption into serialization layer
Solid PR — the encryption wiring is clean, opt-in, and backwards-compatible. The maybeEncrypt/maybeDecrypt pattern is simple and effective, and the encr prefix nesting around the inner devl prefix is a nice layered design.
A few issues below, mostly around performance (repeated key lookups on every stream chunk/flush) and a stale comment.
| @@ -725,7 +812,7 @@ export function getExternalReducers( | |||
| const streamId = ((global as any)[STABLE_ULID] || defaultUlid)(); | |||
There was a problem hiding this comment.
Performance: encryption key fetched on every stream flush
world.getEncryptionKeyForRun?.(runId) is called inside flush(), which runs every 10ms (STREAM_FLUSH_INTERVAL_MS) when the stream is active. If this is backed by a network call (e.g., KMS or an API), that's a lot of per-flush overhead.
Consider caching the key once at construction time or on first flush:
let cachedKey: Uint8Array | undefined;
// in flush:
const key = cachedKey ??= await world.getEncryptionKeyForRun?.(runId);Same issue exists in WorkflowServerReadableStream (line ~392) where the key is fetched on every pull() call — each chunk triggers a new key lookup.
There was a problem hiding this comment.
Fixed. The writable stream now resolves the key once at construction time (lazy, on first flush) and caches the CryptoKey promise. The readable stream caches the CryptoKey in a private field after the first encrypted chunk. Neither path calls getEncryptionKeyForRun more than once per stream instance.
| const startTime = Date.now(); | ||
| const encryptionKey = | ||
| await world.getEncryptionKeyForRun?.(workflowRunId); | ||
| const result = await hydrateStepArguments( |
There was a problem hiding this comment.
nit: Double as any cast — the inner cast on line 310 (hydrateStepArguments(...) as any) and the outer cast on line 303 ((await trace(...)) as any) are redundant. Only the outer one is needed since trace infers its return type from the callback. Removing the inner cast reduces noise.
There was a problem hiding this comment.
Fixed. Removed the inner as any on hydrateStepArguments. The outer cast now uses a descriptive type: as { args: any[]; thisVal?: any; closureVars?: any } instead of as any. Added a comment explaining why the cast is needed (serialization layer returns unknown).
| @@ -0,0 +1,6 @@ | |||
| --- | |||
| "@workflow/core": patch | |||
| "@workflow/world-vercel": patch | |||
There was a problem hiding this comment.
No changes in world-vercel
There was a problem hiding this comment.
Still no changes in world-vercel
There was a problem hiding this comment.
Fixed — removed @workflow/world-vercel from the changeset.
| @@ -0,0 +1,6 @@ | |||
| --- | |||
| "@workflow/core": patch | |||
| "@workflow/world-vercel": patch | |||
There was a problem hiding this comment.
Still no changes in world-vercel
| encryptionKey, | ||
| ops | ||
| ); | ||
| )) as any; |
There was a problem hiding this comment.
I do think any casts could warrant a comment, i.e. what's missing or why is TS confused
There was a problem hiding this comment.
Ya I'm not sure what this is about. We shouldn't be doing as any. Will fix.
There was a problem hiding this comment.
Added a comment explaining the cast and replaced as any with a typed assertion.
…llers to import once per run
…rkflowRun in resume-hook
…ncrypted, complex type round-trips
…import once per run
…sts, fix stale comments

Summary
Wires AES-GCM encryption into the serialization layer and redesigns the EventsConsumer to handle async decryption and out-of-order event logs:
Serialization
maybeEncrypt/maybeDecryptinserialization.tswithencr4-byte format prefixkey: CryptoKey | undefinedEventsConsumer redesign
step_createdevents from async DB writes without deadlocking.setTimeout(0)unconsumed event check. Only fires if replay is truly deadlocked — no progress for 1 second.enqueueResolve(): Step/hook/sleep callbacks use this instead of rawsetTimeoutfor async work (decryption). TrackspendingResolvescount to suppress watchdog during async operations.onEventConsumedcallback: Passive timestamp observation without participating in event matching.suppressUnconsumedCheck()/unsuppressUnconsumedCheck(): Used around workflow args decryption to prevent premature watchdog firing.Dual-mode test suite
workflow.test.tstests run twice: once without encryption and once with a real AES-256-GCM key (130 total)