From bde4376fda02f3b6c2cb9ec4d9b40be5cc7fd289 Mon Sep 17 00:00:00 2001 From: Benjamin Favre Date: Wed, 18 Mar 2026 11:28:13 +0000 Subject: [PATCH 1/4] feat: add Node.js native stream helpers for render pipeline Add `node-stream-helpers.ts` with Node.js native stream utilities that parallel the WhatWG stream helpers in `node-web-streams-helper.ts`. These are the foundational building blocks needed for the node-streams rendering effort (PRs #89566, #89859, #89860, #90500). Key functions: - `chainNodeStreams()` - chains multiple Readable streams sequentially - `createBufferedTransformNode()` - batches small chunks before flushing - `createInlinedDataNodeStream()` - inlines flight data into HTML stream - `pipeNodeReadableToResponse()` - pipes Readable directly to ServerResponse - `nodeStreamToBuffer()` / `nodeStreamToString()` - collection utilities ALS context propagation uses `bindSnapshot()` from the existing `async-local-storage.ts` module, which wraps `AsyncLocalStorage.bind()`. This addresses the review feedback from @lubieowoce on PR #89859 where ALS context was incorrectly propagated by wrapping callback return values instead of binding the callbacks themselves. This PR adds only the helper utilities as new files. No existing files are modified. Wiring into the render pipeline is a separate step. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../stream-utils/node-stream-helpers.test.ts | 292 +++++++++++++ .../stream-utils/node-stream-helpers.ts | 396 ++++++++++++++++++ 2 files changed, 688 insertions(+) create mode 100644 packages/next/src/server/stream-utils/node-stream-helpers.test.ts create mode 100644 packages/next/src/server/stream-utils/node-stream-helpers.ts diff --git a/packages/next/src/server/stream-utils/node-stream-helpers.test.ts b/packages/next/src/server/stream-utils/node-stream-helpers.test.ts new file mode 100644 index 000000000000..abf0cb802221 --- /dev/null +++ b/packages/next/src/server/stream-utils/node-stream-helpers.test.ts @@ -0,0 +1,292 @@ +import { Readable, PassThrough, Transform } from 'node:stream' +import { createServer, type Server } from 'node:http' +import type { AddressInfo } from 'node:net' + +import { + chainNodeStreams, + createBufferedTransformNode, + createInlinedDataNodeStream, + pipeNodeReadableToResponse, + nodeStreamToBuffer, + nodeStreamToString, +} from './node-stream-helpers' + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Creates a Readable that emits the given strings as Buffer chunks. */ +function readableFrom(chunks: string[]): Readable { + const buffers = chunks.map((c) => Buffer.from(c, 'utf-8')) + let index = 0 + return new Readable({ + read() { + if (index < buffers.length) { + this.push(buffers[index++]) + } else { + this.push(null) + } + }, + }) +} + +/** Collect all data from a Readable into a UTF-8 string. */ +async function collectString(readable: Readable): Promise { + return nodeStreamToString(readable) +} + +/** Delay helper for async tests */ +function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +// --------------------------------------------------------------------------- +// chainNodeStreams +// --------------------------------------------------------------------------- + +describe('chainNodeStreams', () => { + it('returns an empty (ended) stream when called with no arguments', async () => { + const result = chainNodeStreams() + const str = await collectString(result) + expect(str).toBe('') + }) + + it('returns the same stream when called with a single argument', () => { + const source = readableFrom(['hello']) + const result = chainNodeStreams(source) + // Should be the exact same object, not a wrapper. + expect(result).toBe(source) + }) + + it('chains two streams in order', async () => { + const a = readableFrom(['hello ']) + const b = readableFrom(['world']) + const result = await collectString(chainNodeStreams(a, b)) + expect(result).toBe('hello world') + }) + + it('chains three streams in order', async () => { + const a = readableFrom(['a']) + const b = readableFrom(['b']) + const c = readableFrom(['c']) + const result = await collectString(chainNodeStreams(a, b, c)) + expect(result).toBe('abc') + }) + + it('handles streams that emit multiple chunks', async () => { + const a = readableFrom(['chunk1-', 'chunk2-']) + const b = readableFrom(['chunk3']) + const result = await collectString(chainNodeStreams(a, b)) + expect(result).toBe('chunk1-chunk2-chunk3') + }) + + it('propagates errors from a source stream', async () => { + const errStream = new Readable({ + read() { + this.destroy(new Error('test error')) + }, + }) + const b = readableFrom(['b']) + + const chained = chainNodeStreams(errStream, b) + + await expect(collectString(chained)).rejects.toThrow('test error') + }) +}) + +// --------------------------------------------------------------------------- +// createBufferedTransformNode +// --------------------------------------------------------------------------- + +describe('createBufferedTransformNode', () => { + it('buffers small chunks and flushes them as a single chunk', async () => { + const source = readableFrom(['a', 'b', 'c']) + const transform = createBufferedTransformNode() + + const output = source.pipe(transform) + const result = await nodeStreamToBuffer(output) + + // All three tiny chunks should have been merged. + // The exact number of output chunks depends on timing, but the + // concatenated result must be correct. + expect(result.toString('utf-8')).toBe('abc') + }) + + it('flushes immediately when maxBufferBytes is reached', async () => { + const transform = createBufferedTransformNode(5) + const outputChunks: Buffer[] = [] + + transform.on('data', (chunk: Buffer) => { + outputChunks.push(chunk) + }) + + const done = new Promise((resolve) => { + transform.on('end', resolve) + }) + + // Write a chunk that exceeds the buffer limit. + transform.write(Buffer.from('hello world')) + transform.end() + + await done + + // The first chunk should have been flushed synchronously because it + // exceeded the buffer limit. + expect(Buffer.concat(outputChunks).toString('utf-8')).toBe('hello world') + }) + + it('flushes remaining data when the stream ends', async () => { + const source = readableFrom(['partial']) + const transform = createBufferedTransformNode(1024) + + const output = source.pipe(transform) + const result = await collectString(output) + expect(result).toBe('partial') + }) +}) + +// --------------------------------------------------------------------------- +// createInlinedDataNodeStream +// --------------------------------------------------------------------------- + +describe('createInlinedDataNodeStream', () => { + it('inlines data chunks alongside HTML chunks (no delay)', async () => { + const dataStream = readableFrom(['']) + const transform = createInlinedDataNodeStream(dataStream, false) + + const htmlStream = readableFrom(['', '']) + const output = htmlStream.pipe(transform) + + const result = await collectString(output) + // HTML chunks should appear, and data should be interleaved. + expect(result).toContain('') + expect(result).toContain('') + expect(result).toContain('') + }) + + it('delays data until first HTML chunk when delayDataUntilFirstHtmlChunk=true', async () => { + const dataStream = readableFrom(['']) + const transform = createInlinedDataNodeStream(dataStream, true) + + const htmlStream = readableFrom(['shell']) + const output = htmlStream.pipe(transform) + + const result = await collectString(output) + expect(result).toContain('shell') + expect(result).toContain('') + }) + + it('handles empty data stream', async () => { + const dataStream = readableFrom([]) + const transform = createInlinedDataNodeStream(dataStream, false) + + const htmlStream = readableFrom(['content']) + const output = htmlStream.pipe(transform) + + const result = await collectString(output) + expect(result).toBe('content') + }) + + it('handles data arriving after HTML finishes', async () => { + // Create a data stream that emits after a delay. + const dataPassthrough = new PassThrough() + const transform = createInlinedDataNodeStream(dataPassthrough, false) + + const htmlStream = readableFrom(['']) + const output = htmlStream.pipe(transform) + + // Emit data after a small delay. + setTimeout(() => { + dataPassthrough.write(Buffer.from('')) + dataPassthrough.end() + }, 50) + + const result = await collectString(output) + expect(result).toContain('') + expect(result).toContain('') + }) +}) + +// --------------------------------------------------------------------------- +// pipeNodeReadableToResponse +// --------------------------------------------------------------------------- + +describe('pipeNodeReadableToResponse', () => { + let server: Server + + afterEach(async () => { + if (server) { + await new Promise((resolve) => server.close(() => resolve())) + } + }) + + it('pipes readable data to a ServerResponse', async () => { + const received = new Promise((resolve, reject) => { + server = createServer((req, res) => { + const source = readableFrom(['hello', ' ', 'world']) + pipeNodeReadableToResponse(source, res, () => { + // onEnd callback should fire. + }) + }) + + server.listen(0, () => { + const { port } = server.address() as AddressInfo + fetch(`http://127.0.0.1:${port}`) + .then((r) => r.text()) + .then(resolve) + .catch(reject) + }) + }) + + expect(await received).toBe('hello world') + }) + + it('handles already-destroyed responses gracefully', () => { + // Simulate a destroyed response (e.g. client disconnected). + const fakeRes = { + destroyed: true, + writableEnded: false, + flushHeaders: jest.fn(), + on: jest.fn(), + once: jest.fn(), + off: jest.fn(), + pipe: jest.fn(), + write: jest.fn(), + end: jest.fn(), + destroy: jest.fn(), + } as any + + const source = readableFrom(['data']) + const onEnd = jest.fn() + + // Should not throw. + pipeNodeReadableToResponse(source, fakeRes, onEnd) + expect(onEnd).toHaveBeenCalled() + }) +}) + +// --------------------------------------------------------------------------- +// nodeStreamToBuffer / nodeStreamToString +// --------------------------------------------------------------------------- + +describe('nodeStreamToBuffer', () => { + it('collects all chunks into a Buffer', async () => { + const source = readableFrom(['hello', ' ', 'world']) + const buf = await nodeStreamToBuffer(source) + expect(buf.toString('utf-8')).toBe('hello world') + }) + + it('returns an empty Buffer for an empty stream', async () => { + const source = readableFrom([]) + const buf = await nodeStreamToBuffer(source) + expect(buf.length).toBe(0) + }) +}) + +describe('nodeStreamToString', () => { + it('collects all chunks into a string', async () => { + const source = readableFrom(['foo', 'bar']) + const str = await nodeStreamToString(source) + expect(str).toBe('foobar') + }) +}) diff --git a/packages/next/src/server/stream-utils/node-stream-helpers.ts b/packages/next/src/server/stream-utils/node-stream-helpers.ts new file mode 100644 index 000000000000..eac55f022d8e --- /dev/null +++ b/packages/next/src/server/stream-utils/node-stream-helpers.ts @@ -0,0 +1,396 @@ +/** + * Node.js native stream utilities for the Next.js render pipeline. + * + * These are the Node.js `stream` equivalents of the WhatWG stream helpers + * in `node-web-streams-helper.ts`. Using Node.js native streams avoids the + * overhead of WhatWG stream polyfills and the web-to-node conversion layer, + * which profiling shows accounts for 35%+ of CPU time in SSR workloads. + * + * **ALS context propagation**: All stream callbacks use `bindSnapshot()` from + * `async-local-storage.ts` to capture and restore the current + * `AsyncLocalStorage` context. This ensures that request-scoped stores + * (workUnitAsyncStorage, workAsyncStorage, etc.) remain accessible in + * transform callbacks even when they execute in a different async context + * (e.g. via `setImmediate` or `process.nextTick`). + * + * This addresses the review feedback from @lubieowoce on PRs #89859/#90500 + * where ALS context was incorrectly propagated by wrapping the callback + * return value rather than binding the callback itself. + */ + +import { Transform, Readable, PassThrough } from 'node:stream' +import type { ServerResponse } from 'node:http' +import { bindSnapshot } from '../app-render/async-local-storage' +import { DetachedPromise } from '../../lib/detached-promise' + +// --------------------------------------------------------------------------- +// chainNodeStreams +// --------------------------------------------------------------------------- + +/** + * Chains multiple Node.js Readable streams sequentially into a single + * Readable. Data from the first stream is emitted first, then the second, + * and so on. + * + * This is the Node.js equivalent of `chainStreams()` in + * `node-web-streams-helper.ts`. + */ +export function chainNodeStreams(...streams: Readable[]): Readable { + if (streams.length === 0) { + // Return an immediately-ended readable (equivalent to ReadableStream that + // closes in start()). + const empty = new PassThrough() + empty.end() + return empty + } + + if (streams.length === 1) { + return streams[0] + } + + const output = new PassThrough() + + // Pipe streams sequentially. When one ends, start the next. + let index = 0 + + function pipeNext() { + if (index >= streams.length) { + output.end() + return + } + + const current = streams[index++] + + // Bind the 'end' handler so that ALS context is preserved across the + // async boundary between streams. + const onEnd = bindSnapshot(() => { + pipeNext() + }) + + const onError = bindSnapshot((err: Error) => { + output.destroy(err) + }) + + // `end: false` prevents the PassThrough from closing when each + // individual source ends -- we close it manually after the last one. + current.pipe(output, { end: false }) + current.once('end', onEnd) + current.once('error', onError) + } + + pipeNext() + + return output +} + +// --------------------------------------------------------------------------- +// createBufferedTransformNode +// --------------------------------------------------------------------------- + +/** + * Creates a Transform stream that batches small chunks before flushing them + * downstream. Chunks are accumulated until either: + * - The buffer reaches `maxBufferBytes`, at which point it flushes + * synchronously, or + * - A `setImmediate` tick fires, flushing whatever has been buffered. + * + * This is the Node.js equivalent of `createBufferedTransformStream()` in + * `node-web-streams-helper.ts`. + */ +export function createBufferedTransformNode( + maxBufferBytes: number = Infinity +): Transform { + let bufferedChunks: Buffer[] = [] + let bufferByteLength = 0 + let pendingFlush: ReturnType | null = null + + function flushBuffer(transform: Transform) { + if (bufferedChunks.length === 0) return + + const merged = Buffer.concat(bufferedChunks, bufferByteLength) + bufferedChunks = [] + bufferByteLength = 0 + + transform.push(merged) + } + + return new Transform({ + transform: bindSnapshot(function ( + this: Transform, + chunk: Buffer, + _encoding: string, + callback: (error?: Error | null) => void + ) { + bufferedChunks.push(chunk) + bufferByteLength += chunk.length + + if (bufferByteLength >= maxBufferBytes) { + // Flush synchronously when the buffer is large enough. + if (pendingFlush !== null) { + clearImmediate(pendingFlush) + pendingFlush = null + } + flushBuffer(this) + callback() + return + } + + // Schedule a flush on the next event loop iteration so that multiple + // small chunks arriving in the same tick get batched together. + if (pendingFlush === null) { + pendingFlush = setImmediate( + bindSnapshot(() => { + pendingFlush = null + flushBuffer(this) + }) + ) + } + callback() + }), + + flush: bindSnapshot(function ( + this: Transform, + callback: (error?: Error | null) => void + ) { + if (pendingFlush !== null) { + clearImmediate(pendingFlush) + pendingFlush = null + } + flushBuffer(this) + callback() + }), + }) +} + +// --------------------------------------------------------------------------- +// createInlinedDataNodeStream +// --------------------------------------------------------------------------- + +/** + * Creates a Transform that inlines flight data from a source Readable stream + * into the HTML stream. As the HTML flows through, chunks from `dataStream` + * are pulled and enqueued into the output interleaved with the HTML. + * + * This is the Node.js equivalent of + * `createFlightDataInjectionTransformStream()` in + * `node-web-streams-helper.ts`. + * + * @param dataStream - The RSC / flight data stream to inline + * @param delayDataUntilFirstHtmlChunk - When true, data pulling does not + * start until the first HTML chunk arrives (used for streaming SSR where + * we want the shell to flush first). + */ +export function createInlinedDataNodeStream( + dataStream: Readable, + delayDataUntilFirstHtmlChunk: boolean +): Transform { + let htmlStreamFinished = false + let dataPullingStarted = false + let dataExhausted = false + + // Collected data chunks that arrived from `dataStream` and are ready to + // be pushed downstream. + let pendingDataChunks: Buffer[] = [] + + // A promise that resolves when all data has been consumed from + // `dataStream`. Used in `flush()` to wait for remaining data. + const dataComplete = new DetachedPromise() + + function startPulling(transform: Transform) { + if (dataPullingStarted) return + dataPullingStarted = true + + const onData = bindSnapshot((chunk: Buffer) => { + if (htmlStreamFinished) { + // If the HTML stream is already done, we push data directly. + transform.push(chunk) + } else { + pendingDataChunks.push(chunk) + } + }) + + const onEnd = bindSnapshot(() => { + dataExhausted = true + dataComplete.resolve() + }) + + const onError = bindSnapshot((err: Error) => { + transform.destroy(err) + dataComplete.resolve() + }) + + if (delayDataUntilFirstHtmlChunk) { + // Wait one tick so the shell can flush first, matching the web stream + // helper behavior. + setImmediate( + bindSnapshot(() => { + dataStream.on('data', onData) + dataStream.once('end', onEnd) + dataStream.once('error', onError) + }) + ) + } else { + dataStream.on('data', onData) + dataStream.once('end', onEnd) + dataStream.once('error', onError) + } + } + + return new Transform({ + transform: bindSnapshot(function ( + this: Transform, + chunk: Buffer, + _encoding: string, + callback: (error?: Error | null) => void + ) { + // Pass through the HTML chunk. + this.push(chunk) + + // Start pulling data on the first HTML chunk if delayed, or + // immediately on construction if not delayed. + if (delayDataUntilFirstHtmlChunk) { + startPulling(this) + } + + // Flush any pending data chunks that have accumulated. + if (pendingDataChunks.length > 0) { + for (const dataChunk of pendingDataChunks) { + this.push(dataChunk) + } + pendingDataChunks = [] + } + + callback() + }), + + flush: bindSnapshot(function ( + this: Transform, + callback: (error?: Error | null) => void + ) { + htmlStreamFinished = true + + // Flush any remaining pending data chunks. + for (const dataChunk of pendingDataChunks) { + this.push(dataChunk) + } + pendingDataChunks = [] + + if (dataExhausted) { + callback() + return + } + + // Wait for the data stream to finish. + dataComplete.promise.then(() => { + callback() + }) + }), + + // Start pulling immediately if we are not delaying. + ...(delayDataUntilFirstHtmlChunk + ? {} + : { + construct: bindSnapshot(function ( + this: Transform, + callback: (error?: Error | null) => void + ) { + startPulling(this) + callback() + }), + }), + }) +} + +// --------------------------------------------------------------------------- +// pipeNodeReadableToResponse +// --------------------------------------------------------------------------- + +/** + * Pipes a Node.js Readable directly to a ServerResponse without going + * through the WhatWG WritableStream conversion. This avoids the overhead + * of `pipe-readable.ts`'s `createWriterFromResponse()` → `pipeTo()` path + * when we already have a Node.js native stream. + * + * Handles backpressure via the standard `.pipe()` mechanism and properly + * cleans up on client disconnect. + * + * @param readable - The source Node.js Readable stream + * @param res - The Node.js ServerResponse to write to + * @param onEnd - Optional callback invoked when piping completes + */ +export function pipeNodeReadableToResponse( + readable: Readable, + res: ServerResponse, + onEnd?: () => void +): void { + if (res.destroyed || res.writableEnded) { + readable.destroy() + onEnd?.() + return + } + + const done = new DetachedPromise() + + const onFinish = bindSnapshot(() => { + cleanup() + done.resolve() + onEnd?.() + }) + + const onError = bindSnapshot((err: Error) => { + cleanup() + if (!res.destroyed) { + res.destroy(err) + } + done.resolve() + // Do not call onEnd on error -- the response is already destroyed. + }) + + const onClose = bindSnapshot(() => { + // Client disconnected. Destroy the readable to stop processing. + if (!readable.destroyed) { + readable.destroy() + } + cleanup() + done.resolve() + }) + + function cleanup() { + readable.off('error', onError) + res.off('close', onClose) + res.off('finish', onFinish) + } + + readable.once('error', onError) + res.once('close', onClose) + res.once('finish', onFinish) + + // Flush headers before data starts flowing. + res.flushHeaders() + + // Use pipe() for automatic backpressure handling. + readable.pipe(res) +} + +// --------------------------------------------------------------------------- +// nodeStreamToBuffer / nodeStreamToString +// --------------------------------------------------------------------------- + +/** + * Collects all chunks from a Node.js Readable into a single Buffer. + */ +export async function nodeStreamToBuffer(readable: Readable): Promise { + const chunks: Buffer[] = [] + for await (const chunk of readable) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)) + } + return Buffer.concat(chunks) +} + +/** + * Collects all chunks from a Node.js Readable into a string. + */ +export async function nodeStreamToString(readable: Readable): Promise { + const buf = await nodeStreamToBuffer(readable) + return buf.toString('utf-8') +} From c0568fe12b7291600583d910de7deffdffbfb84f Mon Sep 17 00:00:00 2001 From: Benjamin Favre Date: Wed, 18 Mar 2026 13:31:33 +0000 Subject: [PATCH 2/4] fix: improve node stream helpers based on review - Use lazy require('node:stream') for Edge/DCE compatibility - Use scheduleImmediate instead of raw setImmediate - Use streaming TextDecoder in nodeStreamToString - Add safePipe helper for proper error propagation - Fix pipeNodeReadableToResponse onClose/onEnd semantics - Add TODO for Buffer-based tag indexOf optimization Co-Authored-By: Claude Opus 4.6 (1M context) --- .../stream-utils/node-stream-helpers.test.ts | 27 ++++- .../stream-utils/node-stream-helpers.ts | 111 +++++++++++++----- 2 files changed, 110 insertions(+), 28 deletions(-) diff --git a/packages/next/src/server/stream-utils/node-stream-helpers.test.ts b/packages/next/src/server/stream-utils/node-stream-helpers.test.ts index abf0cb802221..37650abe9b61 100644 --- a/packages/next/src/server/stream-utils/node-stream-helpers.test.ts +++ b/packages/next/src/server/stream-utils/node-stream-helpers.test.ts @@ -1,4 +1,4 @@ -import { Readable, PassThrough, Transform } from 'node:stream' +import { Readable, PassThrough } from 'node:stream' import { createServer, type Server } from 'node:http' import type { AddressInfo } from 'node:net' @@ -289,4 +289,29 @@ describe('nodeStreamToString', () => { const str = await nodeStreamToString(source) expect(str).toBe('foobar') }) + + it('handles multi-byte UTF-8 characters split across chunks', async () => { + // The emoji is 4 bytes: f0 9f 98 80 + const emoji = Buffer.from('\u{1F600}') + expect(emoji.length).toBe(4) + + // Split the 4-byte sequence across two chunks. + const chunk1 = emoji.subarray(0, 2) // f0 9f + const chunk2 = emoji.subarray(2, 4) // 98 80 + + let index = 0 + const parts = [chunk1, chunk2] + const source = new Readable({ + read() { + if (index < parts.length) { + this.push(parts[index++]) + } else { + this.push(null) + } + }, + }) + + const str = await nodeStreamToString(source) + expect(str).toBe('\u{1F600}') + }) }) diff --git a/packages/next/src/server/stream-utils/node-stream-helpers.ts b/packages/next/src/server/stream-utils/node-stream-helpers.ts index eac55f022d8e..b0d284011c22 100644 --- a/packages/next/src/server/stream-utils/node-stream-helpers.ts +++ b/packages/next/src/server/stream-utils/node-stream-helpers.ts @@ -16,12 +16,51 @@ * This addresses the review feedback from @lubieowoce on PRs #89859/#90500 * where ALS context was incorrectly propagated by wrapping the callback * return value rather than binding the callback itself. + * + * **Edge/DCE safety**: `node:stream` is loaded lazily via `require()` so + * that this module can be safely imported (and tree-shaken) in Edge and + * client bundles without causing a hard failure at import time. */ -import { Transform, Readable, PassThrough } from 'node:stream' +import type { Transform, Readable } from 'node:stream' import type { ServerResponse } from 'node:http' import { bindSnapshot } from '../app-render/async-local-storage' import { DetachedPromise } from '../../lib/detached-promise' +import { scheduleImmediate } from '../../lib/scheduler' + +// --------------------------------------------------------------------------- +// Lazy node:stream accessor (Edge/DCE-safe) +// --------------------------------------------------------------------------- + +/** + * Lazily loads `node:stream` at runtime. This avoids a top-level import that + * would break Edge runtime and prevents Webpack/Turbopack from pulling the + * module into client bundles. + */ +function getNodeStream(): typeof import('node:stream') { + return require('node:stream') as typeof import('node:stream') +} + +// --------------------------------------------------------------------------- +// safePipe – error-forwarding pipe helper +// --------------------------------------------------------------------------- + +/** + * Pipes `source` into `dest` while ensuring errors on the source stream + * are forwarded to the destination. The built-in `.pipe()` does NOT forward + * errors, so without this helper an erroring source can leave the + * destination hanging. + */ +function safePipe( + source: Readable, + dest: T, + opts?: { end?: boolean } +): T { + source.on('error', (err) => { + if (!dest.destroyed) dest.destroy(err) + }) + return source.pipe(dest, opts) +} // --------------------------------------------------------------------------- // chainNodeStreams @@ -36,6 +75,8 @@ import { DetachedPromise } from '../../lib/detached-promise' * `node-web-streams-helper.ts`. */ export function chainNodeStreams(...streams: Readable[]): Readable { + const { PassThrough } = getNodeStream() + if (streams.length === 0) { // Return an immediately-ended readable (equivalent to ReadableStream that // closes in start()). @@ -67,15 +108,11 @@ export function chainNodeStreams(...streams: Readable[]): Readable { pipeNext() }) - const onError = bindSnapshot((err: Error) => { - output.destroy(err) - }) - // `end: false` prevents the PassThrough from closing when each // individual source ends -- we close it manually after the last one. - current.pipe(output, { end: false }) + // Use safePipe so errors on the source propagate to the output. + safePipe(current, output, { end: false }) current.once('end', onEnd) - current.once('error', onError) } pipeNext() @@ -92,7 +129,7 @@ export function chainNodeStreams(...streams: Readable[]): Readable { * downstream. Chunks are accumulated until either: * - The buffer reaches `maxBufferBytes`, at which point it flushes * synchronously, or - * - A `setImmediate` tick fires, flushing whatever has been buffered. + * - A scheduled immediate tick fires, flushing whatever has been buffered. * * This is the Node.js equivalent of `createBufferedTransformStream()` in * `node-web-streams-helper.ts`. @@ -100,9 +137,11 @@ export function chainNodeStreams(...streams: Readable[]): Readable { export function createBufferedTransformNode( maxBufferBytes: number = Infinity ): Transform { + const { Transform: NodeTransform } = getNodeStream() + let bufferedChunks: Buffer[] = [] let bufferByteLength = 0 - let pendingFlush: ReturnType | null = null + let pendingFlush = false function flushBuffer(transform: Transform) { if (bufferedChunks.length === 0) return @@ -114,7 +153,7 @@ export function createBufferedTransformNode( transform.push(merged) } - return new Transform({ + return new NodeTransform({ transform: bindSnapshot(function ( this: Transform, chunk: Buffer, @@ -126,10 +165,8 @@ export function createBufferedTransformNode( if (bufferByteLength >= maxBufferBytes) { // Flush synchronously when the buffer is large enough. - if (pendingFlush !== null) { - clearImmediate(pendingFlush) - pendingFlush = null - } + // Mark pendingFlush as false so the scheduled callback is a no-op. + pendingFlush = false flushBuffer(this) callback() return @@ -137,10 +174,12 @@ export function createBufferedTransformNode( // Schedule a flush on the next event loop iteration so that multiple // small chunks arriving in the same tick get batched together. - if (pendingFlush === null) { - pendingFlush = setImmediate( + if (!pendingFlush) { + pendingFlush = true + scheduleImmediate( bindSnapshot(() => { - pendingFlush = null + if (!pendingFlush) return + pendingFlush = false flushBuffer(this) }) ) @@ -152,10 +191,8 @@ export function createBufferedTransformNode( this: Transform, callback: (error?: Error | null) => void ) { - if (pendingFlush !== null) { - clearImmediate(pendingFlush) - pendingFlush = null - } + // Cancel any pending scheduled flush by marking it as handled. + pendingFlush = false flushBuffer(this) callback() }), @@ -184,6 +221,8 @@ export function createInlinedDataNodeStream( dataStream: Readable, delayDataUntilFirstHtmlChunk: boolean ): Transform { + const { Transform: NodeTransform } = getNodeStream() + let htmlStreamFinished = false let dataPullingStarted = false let dataExhausted = false @@ -196,6 +235,10 @@ export function createInlinedDataNodeStream( // `dataStream`. Used in `flush()` to wait for remaining data. const dataComplete = new DetachedPromise() + // TODO: For future tag-searching transforms, pre-compute Buffer versions + // of ENCODED_TAGS for fast C++ `Buffer.indexOf()` (~1.3-3.2x faster than + // the JS `indexOfUint8Array` used by the web helpers). + function startPulling(transform: Transform) { if (dataPullingStarted) return dataPullingStarted = true @@ -222,7 +265,7 @@ export function createInlinedDataNodeStream( if (delayDataUntilFirstHtmlChunk) { // Wait one tick so the shell can flush first, matching the web stream // helper behavior. - setImmediate( + scheduleImmediate( bindSnapshot(() => { dataStream.on('data', onData) dataStream.once('end', onEnd) @@ -236,7 +279,7 @@ export function createInlinedDataNodeStream( } } - return new Transform({ + return new NodeTransform({ transform: bindSnapshot(function ( this: Transform, chunk: Buffer, @@ -308,7 +351,7 @@ export function createInlinedDataNodeStream( /** * Pipes a Node.js Readable directly to a ServerResponse without going * through the WhatWG WritableStream conversion. This avoids the overhead - * of `pipe-readable.ts`'s `createWriterFromResponse()` → `pipeTo()` path + * of `pipe-readable.ts`'s `createWriterFromResponse()` -> `pipeTo()` path * when we already have a Node.js native stream. * * Handles backpressure via the standard `.pipe()` mechanism and properly @@ -316,7 +359,9 @@ export function createInlinedDataNodeStream( * * @param readable - The source Node.js Readable stream * @param res - The Node.js ServerResponse to write to - * @param onEnd - Optional callback invoked when piping completes + * @param onEnd - Optional callback invoked when piping completes (on + * success or client disconnect). NOT called on stream error since the + * response is already destroyed in that case. */ export function pipeNodeReadableToResponse( readable: Readable, @@ -353,6 +398,9 @@ export function pipeNodeReadableToResponse( } cleanup() done.resolve() + // Call onEnd so the caller knows piping has finished and can clean up + // resources (e.g. abort pending work). + onEnd?.() }) function cleanup() { @@ -389,8 +437,17 @@ export async function nodeStreamToBuffer(readable: Readable): Promise { /** * Collects all chunks from a Node.js Readable into a string. + * + * Uses a streaming `TextDecoder` to correctly handle multi-byte UTF-8 + * characters that may span chunk boundaries. */ export async function nodeStreamToString(readable: Readable): Promise { - const buf = await nodeStreamToBuffer(readable) - return buf.toString('utf-8') + const decoder = new TextDecoder('utf-8', { fatal: true }) + let result = '' + for await (const chunk of readable) { + result += decoder.decode(chunk, { stream: true }) + } + // Flush any remaining bytes in the decoder. + result += decoder.decode() + return result } From da22ba1e1aaa5123c30355dc16218ed0d1c21142 Mon Sep 17 00:00:00 2001 From: Benjamin Favre Date: Wed, 18 Mar 2026 13:37:26 +0000 Subject: [PATCH 3/4] =?UTF-8?q?fix:=20review=20cleanup=20=E2=80=94=20remov?= =?UTF-8?q?e=20unused=20allocation,=20use=20once=20for=20errors?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - safePipe: use .once('error') instead of .on('error') — streams emit at most one error, and once auto-removes the listener - pipeNodeReadableToResponse: remove unused DetachedPromise (was allocated per request but never awaited or returned) - createInlinedDataNodeStream: skip startPulling call when already started (avoids redundant function call per chunk) --- .../next/src/server/stream-utils/node-stream-helpers.ts | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/packages/next/src/server/stream-utils/node-stream-helpers.ts b/packages/next/src/server/stream-utils/node-stream-helpers.ts index b0d284011c22..ef6d1a698d72 100644 --- a/packages/next/src/server/stream-utils/node-stream-helpers.ts +++ b/packages/next/src/server/stream-utils/node-stream-helpers.ts @@ -56,7 +56,7 @@ function safePipe( dest: T, opts?: { end?: boolean } ): T { - source.on('error', (err) => { + source.once('error', (err) => { if (!dest.destroyed) dest.destroy(err) }) return source.pipe(dest, opts) @@ -291,7 +291,7 @@ export function createInlinedDataNodeStream( // Start pulling data on the first HTML chunk if delayed, or // immediately on construction if not delayed. - if (delayDataUntilFirstHtmlChunk) { + if (delayDataUntilFirstHtmlChunk && !dataPullingStarted) { startPulling(this) } @@ -374,11 +374,8 @@ export function pipeNodeReadableToResponse( return } - const done = new DetachedPromise() - const onFinish = bindSnapshot(() => { cleanup() - done.resolve() onEnd?.() }) @@ -387,7 +384,6 @@ export function pipeNodeReadableToResponse( if (!res.destroyed) { res.destroy(err) } - done.resolve() // Do not call onEnd on error -- the response is already destroyed. }) @@ -397,7 +393,6 @@ export function pipeNodeReadableToResponse( readable.destroy() } cleanup() - done.resolve() // Call onEnd so the caller knows piping has finished and can clean up // resources (e.g. abort pending work). onEnd?.() From e334c4b2950dbb67a3dbf6a4f8046fe2706db22d Mon Sep 17 00:00:00 2001 From: Benjamin Favre Date: Wed, 18 Mar 2026 14:23:24 +0000 Subject: [PATCH 4/4] fix: handle empty HTML stream in createInlinedDataNodeStream with delay MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When delayDataUntilFirstHtmlChunk=true and the HTML stream emits zero chunks, the transform callback is never called so startPulling() is never invoked. In flush(), dataExhausted is false, causing it to wait on dataComplete.promise which never resolves — a hang. Call startPulling() in flush() if it hasn't been called yet, so the data stream listeners are attached and can resolve the promise. Adds a regression test for the empty-HTML-stream edge case. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../stream-utils/node-stream-helpers.test.ts | 15 +++++++++++++++ .../server/stream-utils/node-stream-helpers.ts | 7 +++++++ 2 files changed, 22 insertions(+) diff --git a/packages/next/src/server/stream-utils/node-stream-helpers.test.ts b/packages/next/src/server/stream-utils/node-stream-helpers.test.ts index 37650abe9b61..37cfd12ff5ed 100644 --- a/packages/next/src/server/stream-utils/node-stream-helpers.test.ts +++ b/packages/next/src/server/stream-utils/node-stream-helpers.test.ts @@ -187,6 +187,21 @@ describe('createInlinedDataNodeStream', () => { expect(result).toBe('content') }) + it('does not hang when HTML stream is empty and delay is true', async () => { + const dataStream = readableFrom(['']) + const transform = createInlinedDataNodeStream(dataStream, true) + + // An empty HTML stream: ends immediately without emitting any chunks. + const htmlStream = readableFrom([]) + const output = htmlStream.pipe(transform) + + // Without the fix, this would hang forever because startPulling() was + // never called (no HTML chunk arrived) and flush() waited on + // dataComplete.promise which never resolved. + const result = await collectString(output) + expect(result).toContain('') + }) + it('handles data arriving after HTML finishes', async () => { // Create a data stream that emits after a delay. const dataPassthrough = new PassThrough() diff --git a/packages/next/src/server/stream-utils/node-stream-helpers.ts b/packages/next/src/server/stream-utils/node-stream-helpers.ts index ef6d1a698d72..7bbcb9ebd560 100644 --- a/packages/next/src/server/stream-utils/node-stream-helpers.ts +++ b/packages/next/src/server/stream-utils/node-stream-helpers.ts @@ -312,6 +312,13 @@ export function createInlinedDataNodeStream( ) { htmlStreamFinished = true + // If data pulling was delayed and no HTML chunk ever arrived, start + // pulling now so we don't hang waiting for a promise that will never + // resolve. + if (!dataPullingStarted) { + startPulling(this) + } + // Flush any remaining pending data chunks. for (const dataChunk of pendingDataChunks) { this.push(dataChunk)