diff --git a/packages/next/src/server/app-render/app-render.tsx b/packages/next/src/server/app-render/app-render.tsx index 21dfa9ab20a0..15edea0daa2c 100644 --- a/packages/next/src/server/app-render/app-render.tsx +++ b/packages/next/src/server/app-render/app-render.tsx @@ -42,7 +42,8 @@ import { continueFizzStream, continueDynamicPrerender, continueStaticPrerender, - continueDynamicHTMLResume, + continueDynamicHTMLResumeNode, + continueDynamicHTMLResumeWeb, continueStaticFallbackPrerender, streamToBuffer, streamToString, @@ -845,42 +846,77 @@ async function generateDynamicFlightRenderResult( onFlightDataRenderError ) - const debugChannel = setReactDebugChannel && createWebDebugChannel() + if (process.env.__NEXT_USE_NODE_STREAMS) { + const debugChannel = setReactDebugChannel && createNodeDebugChannel() - if (debugChannel) { - setReactDebugChannel(debugChannel.clientSide, htmlRequestId, requestId) - } + if (debugChannel) { + setReactDebugChannel(debugChannel.clientSide, htmlRequestId, requestId) + } - const { clientModules } = getClientReferenceManifest() + const { clientModules } = getClientReferenceManifest() - // For app dir, use the bundled version of Flight server renderer (renderToReadableStream) - // which contains the subset React. - const rscPayload = await workUnitAsyncStorage.run( - requestStore, - generateDynamicRSCPayload, - ctx, - options - ) + const rscPayload = await workUnitAsyncStorage.run( + requestStore, + generateDynamicRSCPayload, + ctx, + options + ) - const flightStream = workUnitAsyncStorage.run( - requestStore, - renderToWebFlightStream, - ctx.componentMod, - rscPayload, - clientModules, - { - onError, - temporaryReferences: options?.temporaryReferences, - filterStackFrame, - debugChannel: debugChannel?.serverSide, + const flightStream = workUnitAsyncStorage.run( + requestStore, + renderToNodeFlightStream, + ctx.componentMod, + rscPayload, + clientModules, + { + onError, + temporaryReferences: options?.temporaryReferences, + filterStackFrame, + debugChannel: debugChannel?.serverSide, + } + ) + + return new FlightRenderResult( + flightStream, + { fetchMetrics: workStore.fetchMetrics }, + options?.waitUntil + ) + } else { + const debugChannel = setReactDebugChannel && createWebDebugChannel() + + if (debugChannel) { + setReactDebugChannel(debugChannel.clientSide, htmlRequestId, requestId) } - ) - return new FlightRenderResult( - flightStream, - { fetchMetrics: workStore.fetchMetrics }, - options?.waitUntil - ) + const { clientModules } = getClientReferenceManifest() + + const rscPayload = await workUnitAsyncStorage.run( + requestStore, + generateDynamicRSCPayload, + ctx, + options + ) + + const flightStream = workUnitAsyncStorage.run( + requestStore, + renderToWebFlightStream, + ctx.componentMod, + rscPayload, + clientModules, + { + onError, + temporaryReferences: options?.temporaryReferences, + filterStackFrame, + debugChannel: debugChannel?.serverSide, + } + ) + + return new FlightRenderResult( + flightStream, + { fetchMetrics: workStore.fetchMetrics }, + options?.waitUntil + ) + } } /** @@ -888,7 +924,7 @@ async function generateDynamicFlightRenderResult( * staged rendering to separate static (RDC-backed) from runtime/dynamic * content. */ -async function generateStagedDynamicFlightRenderResult( +async function generateStagedDynamicFlightRenderResultWeb( req: BaseNextRequest, ctx: AppRenderContext, requestStore: RequestStore @@ -1042,6 +1078,168 @@ async function generateStagedDynamicFlightRenderResult( }) } +/** + * Production-only staged dynamic flight render for cache components (Node.js + * streams). Uses staged rendering to separate static (RDC-backed) from + * runtime/dynamic content. + */ +async function generateStagedDynamicFlightRenderResultNode( + req: BaseNextRequest, + ctx: AppRenderContext, + requestStore: RequestStore +): Promise { + const { componentMod, workStore, renderOpts } = ctx + const { routeModule } = componentMod + const { loaderTree } = routeModule.userland + const { onInstrumentationRequestError, experimental } = renderOpts + + function onFlightDataRenderError(err: DigestedError, silenceLog: boolean) { + return onInstrumentationRequestError?.( + err, + req, + createErrorContext(ctx, 'react-server-components-payload'), + silenceLog + ) + } + + const onError = createReactServerErrorHandler( + false, + false, + workStore.reactServerErrorsByDigest, + onFlightDataRenderError + ) + + const selectStaleTime = createSelectStaleTime(experimental) + const staleTimeIterable = new StaleTimeIterable() + + // TODO(cached-navs): this assumes that we checked during build that there's no sync IO. + // but it can happen e.g. after a revalidation or conditionally for a param that wasn't prerendered. + // we should change this to track sync IO, log an error and advance to dynamic. + const shouldTrackSyncIO = false + const stageController = new StagedRenderingController( + null, // no aborting + null, // no abandoning + shouldTrackSyncIO + ) + + // Initialize stale time tracking on the request store. + requestStore.stale = INFINITE_CACHE + requestStore.stagedRendering = stageController + requestStore.varyParamsAccumulator = createResponseVaryParamsAccumulator() + requestStore.asyncApiPromises = createAsyncApiPromises( + stageController, + requestStore.cookies, + requestStore.mutableCookies, + requestStore.headers + ) + + trackStaleTime( + requestStore as { stale: number }, + staleTimeIterable, + selectStaleTime + ) + + // Deferred promise for the static stage byte length. Flight serializes the + // resolved value into the stream so the client knows where the static + // prefix ends. + let resolveStaticStageByteLength: (count: number) => void + const staticStageByteLengthPromise = new Promise((resolve) => { + resolveStaticStageByteLength = resolve + }) + + // Check if this route has opted into runtime prefetching via + // unstable_instant. If so, we piggyback on the dynamic render to fill caches + // and then spawn a final runtime prerender whose result stream is embedded in + // the RSC payload. This is gated on the explicit opt-in because it adds extra + // server processing, increases the response payload size, and the runtime + // prefetch output should have been validated first. + const hasRuntimePrefetch = + await anySegmentHasRuntimePrefetchEnabled(loaderTree) + + let runtimePrefetchStream: ReadableStream | undefined + + if (hasRuntimePrefetch) { + // Create a mutable cache that gets filled during the dynamic render. + const prerenderResumeDataCache = createPrerenderResumeDataCache() + requestStore.prerenderResumeDataCache = prerenderResumeDataCache + + const cacheSignal = new CacheSignal() + trackPendingModules(cacheSignal) + requestStore.cacheSignal = cacheSignal + + // Create a deferred stream for the runtime prefetch result. Its readable + // side goes into the RSC payload (Flight serializes it lazily). The + // writable side receives the runtime prerender result once the dynamic + // render has filled all caches. + const runtimePrefetchTransform = new TransformStream() + runtimePrefetchStream = runtimePrefetchTransform.readable + + // Wait for the dynamic render to fill caches, then run the final runtime + // prerender (fire-and-forget — does not block the response). + void cacheSignal + .cacheReady() + .then(() => + spawnRuntimePrefetchWithFilledCaches( + runtimePrefetchTransform.writable, + ctx, + prerenderResumeDataCache, + requestStore, + onError + ) + ) + } + + const rscPayload = await workUnitAsyncStorage.run( + requestStore, + generateDynamicRSCPayload, + ctx, + { staleTimeIterable, staticStageByteLengthPromise, runtimePrefetchStream } + ) + + const { clientModules } = getClientReferenceManifest() + + const flightStream = await runInSequentialTasks( + () => { + stageController.advanceStage(RenderStage.Static) + + const sourceStream = workUnitAsyncStorage.run( + requestStore, + renderToNodeFlightStream, + ctx.componentMod, + rscPayload, + clientModules, + { onError, filterStackFrame } + ) as Readable + + const replayable = new ReplayableNodeStream(sourceStream) + const dynamicStream = replayable.createReplayStream() + const staticStream = replayable.createReplayStream() + + countStaticStageBytesNode(staticStream, stageController).then( + resolveStaticStageByteLength + ) + + return dynamicStream + }, + () => { + // This is a separate task that doesn't advance a stage. It forces + // draining the microtask queue so that the stale time iterable and vary + // params accumulators are closed before we advance to the dynamic stage. + void finishStaleTimeTracking(staleTimeIterable) + if (requestStore.varyParamsAccumulator) { + void finishAccumulatingVaryParams(requestStore.varyParamsAccumulator) + } + }, + () => { + stageController.advanceStage(RenderStage.Dynamic) + } + ) + + return new FlightRenderResult(flightStream, { + fetchMetrics: workStore.fetchMetrics, + }) +} + /** * Runs a final runtime prerender using the provided (already filled) cache and * pipes its output into the provided writable stream. The caller is responsible @@ -2718,8 +2916,10 @@ async function renderToHTMLOrFlightImpl( }) } + // MARK: RSC request if (isRSCRequest) { if (isRuntimePrefetchRequest) { + // MARK: RSC runtimePrefetch return generateRuntimePrefetchResult(req, ctx, requestStore) } else { if ( @@ -2727,6 +2927,7 @@ async function renderToHTMLOrFlightImpl( process.env.NEXT_RUNTIME !== 'edge' && cacheComponents ) { + // MARK: RSC devCacheComponents return generateDynamicFlightRenderResultWithStagesInDev( req, ctx, @@ -2735,8 +2936,22 @@ async function renderToHTMLOrFlightImpl( fallbackParams ) } else if (cacheComponents && cachedNavigations) { - return generateStagedDynamicFlightRenderResult(req, ctx, requestStore) + // MARK: RSC cacheComponents + if (process.env.__NEXT_USE_NODE_STREAMS) { + return generateStagedDynamicFlightRenderResultNode( + req, + ctx, + requestStore + ) + } else { + return generateStagedDynamicFlightRenderResultWeb( + req, + ctx, + requestStore + ) + } } else { + // MARK: RSC dynamic return generateDynamicFlightRenderResult(req, ctx, requestStore) } } @@ -3783,7 +3998,7 @@ async function renderToStream( // We have a complete HTML Document in the prerender but we need to // still include the new server component render because it was not included // in the static prelude. - const inlinedDataStream = createWebInlinedDataStream( + const inlinedDataStream = createNodeInlinedDataStream( reactServerResult.tee(), nonce, formState @@ -3834,10 +4049,10 @@ async function renderToStream( if (renderSpan.isRecording()) renderSpan.end() }) - return await continueDynamicHTMLResume(htmlStream, { + return await continueDynamicHTMLResumeNode(htmlStream, { delayDataUntilFirstHtmlChunk: preludeState === DynamicHTMLPreludeState.Empty, - inlinedDataStream: createWebInlinedDataStream( + inlinedDataStream: createNodeInlinedDataStream( reactServerResult.consume(), nonce, formState @@ -3973,7 +4188,7 @@ async function renderToStream( if (renderSpan.isRecording()) renderSpan.end() }) - return await continueDynamicHTMLResume(htmlStream, { + return await continueDynamicHTMLResumeWeb(htmlStream, { delayDataUntilFirstHtmlChunk: preludeState === DynamicHTMLPreludeState.Empty, inlinedDataStream: createWebInlinedDataStream( @@ -5243,12 +5458,22 @@ async function logMessagesAndSendErrorsToBrowser( const { clientModules } = getClientReferenceManifest() - const errorsFlightStream = renderToWebFlightStream( - ctx.componentMod, - { errors, errorCodes }, - clientModules, - { filterStackFrame } - ) + let errorsFlightStream: AnyStream + if (process.env.__NEXT_USE_NODE_STREAMS) { + errorsFlightStream = renderToNodeFlightStream( + ctx.componentMod, + { errors, errorCodes }, + clientModules, + { filterStackFrame } + ) + } else { + errorsFlightStream = renderToWebFlightStream( + ctx.componentMod, + { errors, errorCodes }, + clientModules, + { filterStackFrame } + ) + } sendErrorsToBrowser(errorsFlightStream, htmlRequestId) } diff --git a/packages/next/src/server/app-render/stream-ops.node.ts b/packages/next/src/server/app-render/stream-ops.node.ts index c4bbd483d846..2e8412e74396 100644 --- a/packages/next/src/server/app-render/stream-ops.node.ts +++ b/packages/next/src/server/app-render/stream-ops.node.ts @@ -812,7 +812,52 @@ export async function continueStaticFallbackPrerender( return webToReadable(webResult) } -export async function continueDynamicHTMLResume( +export async function continueDynamicHTMLResumeNode( + renderStream: AnyStream, + { + delayDataUntilFirstHtmlChunk, + inlinedDataStream, + getServerInsertedHTML, + getServerInsertedMetadata, + deploymentId, + }: import('./stream-ops.web').ContinueDynamicHTMLResumeOptions +): Promise { + await waitAtLeastOneReactRenderTask() + + const buffered = createBufferedTransformStream() + webToReadable(renderStream).pipe(buffered) + + let source: Readable = buffered + + if (deploymentId) { + const dplId = createHtmlDataDplIdTransform(deploymentId) + source.pipe(dplId) + source = dplId + } + + const headInsertion = createHeadInsertionTransform(getServerInsertedHTML) + source.pipe(headInsertion) + source = headInsertion + + const metadata = createMetadataTransform(getServerInsertedMetadata) + source.pipe(metadata) + source = metadata + + const flightInjection = createFlightDataInjectionTransform( + webToReadable(inlinedDataStream), + delayDataUntilFirstHtmlChunk + ) + source.pipe(flightInjection) + source = flightInjection + + const moveSuffix = createMoveSuffixTransform() + source.pipe(moveSuffix) + source = moveSuffix + + return source +} + +export async function continueDynamicHTMLResumeWeb( renderStream: AnyStream, opts: import('./stream-ops.web').ContinueDynamicHTMLResumeOptions ): Promise { diff --git a/packages/next/src/server/app-render/stream-ops.ts b/packages/next/src/server/app-render/stream-ops.ts index 015a035bf1e5..f28b6dc2187e 100644 --- a/packages/next/src/server/app-render/stream-ops.ts +++ b/packages/next/src/server/app-render/stream-ops.ts @@ -35,7 +35,8 @@ export const continueStaticPrerender = _m.continueStaticPrerender export const continueDynamicPrerender = _m.continueDynamicPrerender export const continueStaticFallbackPrerender = _m.continueStaticFallbackPrerender -export const continueDynamicHTMLResume = _m.continueDynamicHTMLResume +export const continueDynamicHTMLResumeNode = _m.continueDynamicHTMLResumeNode +export const continueDynamicHTMLResumeWeb = _m.continueDynamicHTMLResumeWeb export const streamToBuffer = _m.streamToBuffer export const chainStreams = _m.chainStreams export const createDocumentClosingStream = _m.createDocumentClosingStream diff --git a/packages/next/src/server/app-render/stream-ops.web.ts b/packages/next/src/server/app-render/stream-ops.web.ts index cf3b47d702f6..cb755e92c03f 100644 --- a/packages/next/src/server/app-render/stream-ops.web.ts +++ b/packages/next/src/server/app-render/stream-ops.web.ts @@ -141,7 +141,7 @@ export async function continueStaticFallbackPrerender( ) } -export async function continueDynamicHTMLResume( +export async function continueDynamicHTMLResumeWeb( renderStream: AnyStream, opts: ContinueDynamicHTMLResumeOptions ): Promise { @@ -154,6 +154,13 @@ export async function continueDynamicHTMLResume( ) } +export function continueDynamicHTMLResumeNode( + _renderStream: AnyStream, + _opts: ContinueDynamicHTMLResumeOptions +): Promise { + throw new Error('not implemented') +} + export async function streamToBuffer(stream: AnyStream): Promise { return webStreamToBuffer(stream as ReadableStream) }