From b053f38e25a029d9966920ab946c1e0966e69b9b Mon Sep 17 00:00:00 2001 From: Martin Sonnberger Date: Tue, 26 Aug 2025 11:36:48 +0200 Subject: [PATCH 1/3] feat(aws): Add support for streaming handlers --- .../Streaming/index.mjs | 8 + .../aws-serverless/tests/layer.test.ts | 48 +++++ .../instrumentation.ts | 189 +++++++++++++----- packages/aws-serverless/src/sdk.ts | 142 ++++++++++--- packages/aws-serverless/test/sdk.test.ts | 187 ++++++++++++++++- 5 files changed, 493 insertions(+), 81 deletions(-) create mode 100644 dev-packages/e2e-tests/test-applications/aws-serverless/src/lambda-functions-layer/Streaming/index.mjs diff --git a/dev-packages/e2e-tests/test-applications/aws-serverless/src/lambda-functions-layer/Streaming/index.mjs b/dev-packages/e2e-tests/test-applications/aws-serverless/src/lambda-functions-layer/Streaming/index.mjs new file mode 100644 index 000000000000..d46b9df502b9 --- /dev/null +++ b/dev-packages/e2e-tests/test-applications/aws-serverless/src/lambda-functions-layer/Streaming/index.mjs @@ -0,0 +1,8 @@ +import * as Sentry from '@sentry/aws-serverless'; + +export const handler = awslambda.streamifyResponse(async (event, responseStream, context) => { + Sentry.startSpan({ name: 'manual-span', op: 'test' }, async () => { + responseStream.write('Hello, world!'); + responseStream.end(); + }); +}); diff --git a/dev-packages/e2e-tests/test-applications/aws-serverless/tests/layer.test.ts b/dev-packages/e2e-tests/test-applications/aws-serverless/tests/layer.test.ts index c20659835ee8..4d68efb66b08 100644 --- a/dev-packages/e2e-tests/test-applications/aws-serverless/tests/layer.test.ts +++ b/dev-packages/e2e-tests/test-applications/aws-serverless/tests/layer.test.ts @@ -194,4 +194,52 @@ test.describe('Lambda layer', () => { }), ); }); + + test('streaming handlers work', async ({ lambdaClient }) => { + const transactionEventPromise = waitForTransaction('aws-serverless-lambda-sam', transactionEvent => { + return transactionEvent?.transaction === 'LayerStreaming'; + }); + + await lambdaClient.send( + new InvokeCommand({ + FunctionName: 'LayerStreaming', + Payload: JSON.stringify({}), + }), + ); + + const transactionEvent = await transactionEventPromise; + + expect(transactionEvent.transaction).toEqual('LayerStreaming'); + expect(transactionEvent.contexts?.trace).toEqual({ + data: { + 'sentry.sample_rate': 1, + 'sentry.source': 'custom', + 'sentry.origin': 'auto.otel.aws-lambda', + 'sentry.op': 'function.aws.lambda', + 'cloud.account.id': '012345678912', + 'faas.execution': expect.any(String), + 'faas.id': 'arn:aws:lambda:us-east-1:012345678912:function:LayerStreaming', + 'faas.coldstart': true, + 'otel.kind': 'SERVER', + }, + op: 'function.aws.lambda', + origin: 'auto.otel.aws-lambda', + span_id: expect.stringMatching(/[a-f0-9]{16}/), + status: 'ok', + trace_id: expect.stringMatching(/[a-f0-9]{32}/), + }); + + expect(transactionEvent.spans).toHaveLength(1); + + expect(transactionEvent.spans).toContainEqual( + expect.objectContaining({ + data: expect.objectContaining({ + 'sentry.op': 'test', + 'sentry.origin': 'manual', + }), + description: 'manual-span', + op: 'test', + }), + ); + }); }); diff --git a/packages/aws-serverless/src/integration/instrumentation-aws-lambda/instrumentation.ts b/packages/aws-serverless/src/integration/instrumentation-aws-lambda/instrumentation.ts index 39b63551b2aa..94df0311f12e 100644 --- a/packages/aws-serverless/src/integration/instrumentation-aws-lambda/instrumentation.ts +++ b/packages/aws-serverless/src/integration/instrumentation-aws-lambda/instrumentation.ts @@ -3,6 +3,7 @@ // - Added Sentry `wrapHandler` around the OTel patch handler. // - Cancel init when handler string is invalid (TS) // - Hardcoded package version and name +// - Added support for streaming handlers /* eslint-disable */ /* * Copyright The OpenTelemetry Authors @@ -50,7 +51,7 @@ import { SEMRESATTRS_CLOUD_ACCOUNT_ID, SEMRESATTRS_FAAS_ID, } from '@opentelemetry/semantic-conventions'; -import type { APIGatewayProxyEventHeaders, Callback, Context, Handler } from 'aws-lambda'; +import type { APIGatewayProxyEventHeaders, Callback, Context, Handler, StreamifyHandler } from 'aws-lambda'; import * as fs from 'fs'; import * as path from 'path'; import type { LambdaModule } from './internal-types'; @@ -73,6 +74,8 @@ const headerGetter: TextMapGetter = { }; export const lambdaMaxInitInMilliseconds = 10_000; +const AWS_HANDLER_STREAMING_SYMBOL = Symbol.for('aws.lambda.runtime.handler.streaming'); +const AWS_HANDLER_STREAMING_RESPONSE = 'response'; /** * @@ -101,6 +104,18 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { + // Add the streaming symbols that the instrumentation looks for + handler[AWS_HANDLER_STREAMING_SYMBOL] = AWS_HANDLER_STREAMING_RESPONSE; + return handler; + }, + }; + } + const handler = path.basename(handlerDef); const moduleRoot = handlerDef.substring(0, handlerDef.length - handler.length); @@ -187,16 +202,32 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { - return wrapHandler(this._getPatchHandler(original, handlerLoadStartTime)); + private _getHandler(handlerLoadStartTime: number) { + return (original: T): T => { + if (this._isStreamingHandler(original)) { + const patchedHandler = this._getPatchHandler(original, handlerLoadStartTime); + + // Streaming handlers have special symbols that we need to copy over to the patched handler. + for (const symbol of Object.getOwnPropertySymbols(original)) { + (patchedHandler as unknown as Record)[symbol] = ( + original as unknown as Record + )[symbol]; + } + + return wrapHandler(patchedHandler) as T; + } + + return wrapHandler(this._getPatchHandler(original, handlerLoadStartTime)) as T; }; } + private _getPatchHandler(original: Handler, lambdaStartTime: number): Handler; + private _getPatchHandler(original: StreamifyHandler, lambdaStartTime: number): StreamifyHandler; + /** * */ - private _getPatchHandler(original: Handler, lambdaStartTime: number) { + private _getPatchHandler(original: Handler | StreamifyHandler, lambdaStartTime: number): Handler | StreamifyHandler { diag.debug('patch handler function'); const plugin = this; @@ -229,6 +260,36 @@ export class AwsLambdaInstrumentation extends InstrumentationBase[1], + context: Context, + ) { + _onRequest(); + const parent = plugin._determineParent(event, context); + const span = plugin._createSpanForRequest(event, context, requestIsColdStart, parent); + plugin._applyRequestHook(span, event, context); + + return otelContext.with(trace.setSpan(parent, span), () => { + const maybePromise = safeExecuteInTheMiddle( + () => original.apply(this, [event, responseStream, context]), + error => { + if (error != null) { + // Exception thrown synchronously before resolving promise. + plugin._applyResponseHook(span, error); + plugin._endSpan(span, error, () => {}); + } + }, + ) as Promise<{}> | undefined; + + return plugin._handlePromiseResult(span, maybePromise); + }); + }; + } + return function patchedHandler( this: never, // The event can be a user type, it truly is any. @@ -239,39 +300,10 @@ export class AwsLambdaInstrumentation extends InstrumentationBase requestHook(span, { event, context }), - e => { - if (e) diag.error('aws-lambda instrumentation: requestHook error', e); - }, - true, - ); - } + const span = plugin._createSpanForRequest(event, context, requestIsColdStart, parent); + plugin._applyRequestHook(span, event, context); return otelContext.with(trace.setSpan(parent, span), () => { // Lambda seems to pass a callback even if handler is of Promise form, so we wrap all the time before calling @@ -289,23 +321,80 @@ export class AwsLambdaInstrumentation extends InstrumentationBase | undefined; - if (typeof maybePromise?.then === 'function') { - return maybePromise.then( - value => { - plugin._applyResponseHook(span, null, value); - return new Promise(resolve => plugin._endSpan(span, undefined, () => resolve(value))); - }, - (err: Error | string) => { - plugin._applyResponseHook(span, err); - return new Promise((resolve, reject) => plugin._endSpan(span, err, () => reject(err))); - }, - ); - } - return maybePromise; + + return plugin._handlePromiseResult(span, maybePromise); }); }; } + private _createSpanForRequest(event: any, context: Context, requestIsColdStart: boolean, parent: OtelContext): Span { + const name = context.functionName; + return this.tracer.startSpan( + name, + { + kind: SpanKind.SERVER, + attributes: { + [SEMATTRS_FAAS_EXECUTION]: context.awsRequestId, + [SEMRESATTRS_FAAS_ID]: context.invokedFunctionArn, + [SEMRESATTRS_CLOUD_ACCOUNT_ID]: AwsLambdaInstrumentation._extractAccountId(context.invokedFunctionArn), + [ATTR_FAAS_COLDSTART]: requestIsColdStart, + ...AwsLambdaInstrumentation._extractOtherEventFields(event), + }, + }, + parent, + ); + } + + private _applyRequestHook(span: Span, event: any, context: Context): void { + const { requestHook } = this.getConfig(); + if (requestHook) { + safeExecuteInTheMiddle( + () => requestHook(span, { event, context }), + e => { + if (e) diag.error('aws-lambda instrumentation: requestHook error', e); + }, + true, + ); + } + } + + private _handlePromiseResult(span: Span, maybePromise: Promise<{}> | undefined): Promise<{}> | undefined { + if (typeof maybePromise?.then === 'function') { + return maybePromise.then( + value => { + this._applyResponseHook(span, null, value); + return new Promise(resolve => this._endSpan(span, undefined, () => resolve(value))); + }, + (err: Error | string) => { + this._applyResponseHook(span, err); + return new Promise((resolve, reject) => this._endSpan(span, err, () => reject(err))); + }, + ); + } + + // Handle synchronous return values by ending the span and applying response hook + this._applyResponseHook(span, null, maybePromise); + this._endSpan(span, undefined, () => {}); + return maybePromise; + } + + private _determineParent(event: any, context: Context): OtelContext { + const config = this.getConfig(); + return AwsLambdaInstrumentation._determineParent( + event, + context, + config.eventContextExtractor || AwsLambdaInstrumentation._defaultEventContextExtractor, + ); + } + + private _isStreamingHandler( + handler: Handler | StreamifyHandler, + ): handler is StreamifyHandler { + return ( + (handler as unknown as Record)[AWS_HANDLER_STREAMING_SYMBOL] === AWS_HANDLER_STREAMING_RESPONSE + ); + } + /** * */ diff --git a/packages/aws-serverless/src/sdk.ts b/packages/aws-serverless/src/sdk.ts index e6f7d5f3a4f0..32da13d3cc00 100644 --- a/packages/aws-serverless/src/sdk.ts +++ b/packages/aws-serverless/src/sdk.ts @@ -1,7 +1,7 @@ import type { Scope } from '@sentry/core'; import { consoleSandbox, debug } from '@sentry/core'; import { captureException, captureMessage, flush, getCurrentScope, withScope } from '@sentry/node'; -import type { Context, Handler } from 'aws-lambda'; +import type { Context, Handler, StreamifyHandler } from 'aws-lambda'; import { performance } from 'perf_hooks'; import { types } from 'util'; import { DEBUG_BUILD } from './debug-build'; @@ -108,6 +108,51 @@ function enhanceScopeWithEnvironmentData(scope: Scope, context: Context, startTi }); } +function setupTimeoutWatning(context: Context, options: WrapperOptions): NodeJS.Timeout | undefined { + // In seconds. You cannot go any more granular than this in AWS Lambda. + const configuredTimeout = Math.ceil(tryGetRemainingTimeInMillis(context) / 1000); + const configuredTimeoutMinutes = Math.floor(configuredTimeout / 60); + const configuredTimeoutSeconds = configuredTimeout % 60; + + const humanReadableTimeout = + configuredTimeoutMinutes > 0 + ? `${configuredTimeoutMinutes}m${configuredTimeoutSeconds}s` + : `${configuredTimeoutSeconds}s`; + + if (options.captureTimeoutWarning) { + const timeoutWarningDelay = tryGetRemainingTimeInMillis(context) - options.timeoutWarningLimit; + + return setTimeout(() => { + withScope(scope => { + scope.setTag('timeout', humanReadableTimeout); + captureMessage(`Possible function timeout: ${context.functionName}`, 'warning'); + }); + }, timeoutWarningDelay) as unknown as NodeJS.Timeout; + } + + return undefined; +} + +export const AWS_HANDLER_HIGHWATERMARK_SYMBOL = Symbol.for('aws.lambda.runtime.handler.highWaterMark'); +export const AWS_HANDLER_STREAMING_SYMBOL = Symbol.for('aws.lambda.runtime.handler.streaming'); +export const AWS_HANDLER_STREAMING_RESPONSE = 'response'; + +function isStreamingHandler(handler: Handler | StreamifyHandler): handler is StreamifyHandler { + return ( + (handler as unknown as Record)[AWS_HANDLER_STREAMING_SYMBOL] === AWS_HANDLER_STREAMING_RESPONSE + ); +} + +export function wrapHandler( + handler: Handler, + wrapOptions?: Partial, +): Handler; + +export function wrapHandler( + handler: StreamifyHandler, + wrapOptions?: Partial, +): StreamifyHandler; + /** * Wraps a lambda handler adding it error capture and tracing capabilities. * @@ -116,9 +161,9 @@ function enhanceScopeWithEnvironmentData(scope: Scope, context: Context, startTi * @returns Handler */ export function wrapHandler( - handler: Handler, + handler: Handler | StreamifyHandler, wrapOptions: Partial = {}, -): Handler { +): Handler | StreamifyHandler { const START_TIME = performance.now(); // eslint-disable-next-line deprecation/deprecation @@ -141,18 +186,22 @@ export function wrapHandler( ...wrapOptions, }; - let timeoutWarningTimer: NodeJS.Timeout; + if (isStreamingHandler(handler)) { + return wrapStreamingHandler(handler, options, START_TIME); + } + + let timeoutWarningTimer: NodeJS.Timeout | undefined; // AWSLambda is like Express. It makes a distinction about handlers based on its last argument // async (event) => async handler // async (event, context) => async handler // (event, context, callback) => sync handler // Nevertheless whatever option is chosen by user, we convert it to async handler. - const asyncHandler: AsyncHandler = + const asyncHandler: AsyncHandler> = handler.length > 2 ? (event, context) => new Promise((resolve, reject) => { - const rv = (handler as SyncHandler)(event, context, (error, result) => { + const rv = (handler as SyncHandler>)(event, context, (error, result) => { if (error === null || error === undefined) { resolve(result!); // eslint-disable-line @typescript-eslint/no-non-null-assertion } else { @@ -166,33 +215,12 @@ export function wrapHandler( void (rv as Promise>).then(resolve, reject); } }) - : (handler as AsyncHandler); + : (handler as AsyncHandler>); - return async (event, context) => { + return async (event: TEvent, context: Context) => { context.callbackWaitsForEmptyEventLoop = options.callbackWaitsForEmptyEventLoop; - // In seconds. You cannot go any more granular than this in AWS Lambda. - const configuredTimeout = Math.ceil(tryGetRemainingTimeInMillis(context) / 1000); - const configuredTimeoutMinutes = Math.floor(configuredTimeout / 60); - const configuredTimeoutSeconds = configuredTimeout % 60; - - const humanReadableTimeout = - configuredTimeoutMinutes > 0 - ? `${configuredTimeoutMinutes}m${configuredTimeoutSeconds}s` - : `${configuredTimeoutSeconds}s`; - - // When `callbackWaitsForEmptyEventLoop` is set to false, which it should when using `captureTimeoutWarning`, - // we don't have a guarantee that this message will be delivered. Because of that, we don't flush it. - if (options.captureTimeoutWarning) { - const timeoutWarningDelay = tryGetRemainingTimeInMillis(context) - options.timeoutWarningLimit; - - timeoutWarningTimer = setTimeout(() => { - withScope(scope => { - scope.setTag('timeout', humanReadableTimeout); - captureMessage(`Possible function timeout: ${context.functionName}`, 'warning'); - }); - }, timeoutWarningDelay) as unknown as NodeJS.Timeout; - } + timeoutWarningTimer = setupTimeoutWatning(context, options); async function processResult(): Promise { const scope = getCurrentScope(); @@ -229,3 +257,57 @@ export function wrapHandler( }); }; } + +function wrapStreamingHandler( + handler: StreamifyHandler, + options: WrapperOptions, + startTime: number, +): StreamifyHandler { + let timeoutWarningTimer: NodeJS.Timeout | undefined; + + const wrappedHandler = async ( + event: TEvent, + responseStream: Parameters>[1], + context: Context, + ): Promise => { + context.callbackWaitsForEmptyEventLoop = options.callbackWaitsForEmptyEventLoop; + + timeoutWarningTimer = setupTimeoutWatning(context, options); + + async function processStreamingResult(): Promise { + const scope = getCurrentScope(); + + try { + enhanceScopeWithEnvironmentData(scope, context, startTime); + + responseStream.on('error', error => { + captureException(error, scope => markEventUnhandled(scope, 'auto.function.aws-serverless.stream')); + }); + + return await handler(event, responseStream, context); + } catch (e) { + // Errors should already captured in the instrumentation's `responseHook`, + // we capture them here just to be safe. Double captures are deduplicated by the SDK. + captureException(e, scope => markEventUnhandled(scope, 'auto.function.aws-serverless.handler')); + throw e; + } finally { + if (timeoutWarningTimer) { + clearTimeout(timeoutWarningTimer); + } + await flush(options.flushTimeout).catch(e => { + DEBUG_BUILD && debug.error(e); + }); + } + } + + return withScope(() => processStreamingResult()); + }; + + const handlerWithSymbols = handler as unknown as Record; + (wrappedHandler as unknown as Record)[AWS_HANDLER_STREAMING_SYMBOL] = + handlerWithSymbols[AWS_HANDLER_STREAMING_SYMBOL]; + (wrappedHandler as unknown as Record)[AWS_HANDLER_HIGHWATERMARK_SYMBOL] = + handlerWithSymbols[AWS_HANDLER_HIGHWATERMARK_SYMBOL]; + + return wrappedHandler; +} diff --git a/packages/aws-serverless/test/sdk.test.ts b/packages/aws-serverless/test/sdk.test.ts index 58bb04a234b9..25ed0411c3d0 100644 --- a/packages/aws-serverless/test/sdk.test.ts +++ b/packages/aws-serverless/test/sdk.test.ts @@ -2,7 +2,7 @@ import type { Event } from '@sentry/core'; import type { Callback, Handler } from 'aws-lambda'; import { beforeEach, describe, expect, test, vi } from 'vitest'; import { init } from '../src/init'; -import { wrapHandler } from '../src/sdk'; +import { AWS_HANDLER_STREAMING_SYMBOL, AWS_HANDLER_STREAMING_RESPONSE, wrapHandler } from '../src/sdk'; const mockFlush = vi.fn((...args) => Promise.resolve(args)); const mockWithScope = vi.fn(); @@ -368,6 +368,191 @@ describe('AWSLambda', () => { }); }); + describe('wrapHandler() on streaming handlers', () => { + // Mock response stream with common stream interface + const mockResponseStream = { + write: vi.fn(), + end: vi.fn(), + destroy: vi.fn(), + on: vi.fn(), + setContentType: vi.fn(), + writable: true, + writableEnded: false, + writableFinished: false, + }; + + beforeEach(() => { + vi.clearAllMocks(); + mockResponseStream.write.mockClear(); + mockResponseStream.end.mockClear(); + mockResponseStream.destroy.mockClear(); + mockResponseStream.on.mockClear(); + }); + + test('successful execution', async () => { + expect.assertions(5); + + const streamingHandler = vi.fn(async (_event, _responseStream, _context) => { + return 42; + }); + // Add the streaming symbol to mark it as a streaming handler + (streamingHandler as any)[AWS_HANDLER_STREAMING_SYMBOL] = AWS_HANDLER_STREAMING_RESPONSE; + + const wrappedHandler = wrapHandler(streamingHandler); + const rv = await (wrappedHandler as any)(fakeEvent, mockResponseStream, fakeContext); + + expect(rv).toStrictEqual(42); + expectScopeSettings(); + expect(streamingHandler).toHaveBeenCalledWith(fakeEvent, mockResponseStream, fakeContext); + expect(mockFlush).toBeCalledWith(2000); + }); + + test('preserves streaming symbol on wrapped handler', () => { + const streamingHandler = vi.fn(async (_event, _responseStream, _context) => { + return 42; + }); + (streamingHandler as any)[AWS_HANDLER_STREAMING_SYMBOL] = AWS_HANDLER_STREAMING_RESPONSE; + + const wrappedHandler = wrapHandler(streamingHandler); + + expect((wrappedHandler as any)[AWS_HANDLER_STREAMING_SYMBOL]).toBe(AWS_HANDLER_STREAMING_RESPONSE); + }); + + test('event, responseStream and context are correctly passed along', async () => { + expect.assertions(3); + + const streamingHandler = vi.fn(async (event, responseStream, context) => { + expect(event).toHaveProperty('fortySix'); + expect(responseStream).toBe(mockResponseStream); + expect(context).toHaveProperty('ytho'); + return 'success'; + }); + (streamingHandler as any)[AWS_HANDLER_STREAMING_SYMBOL] = AWS_HANDLER_STREAMING_RESPONSE; + + const wrappedHandler = wrapHandler(streamingHandler); + await (wrappedHandler as any)(fakeEvent, mockResponseStream, fakeContext); + }); + + test('capture error from handler execution', async () => { + expect.assertions(4); + + const error = new Error('streaming handler error'); + const streamingHandler = vi.fn(async (_event, _responseStream, _context) => { + throw error; + }); + (streamingHandler as any)[AWS_HANDLER_STREAMING_SYMBOL] = AWS_HANDLER_STREAMING_RESPONSE; + + const wrappedHandler = wrapHandler(streamingHandler); + + try { + await (wrappedHandler as any)(fakeEvent, mockResponseStream, fakeContext); + } catch { + expectScopeSettings(); + expect(mockCaptureException).toBeCalledWith(error, expect.any(Function)); + expect(mockFlush).toBeCalled(); + } + }); + + test('capture stream errors', async () => { + expect.assertions(3); + + const streamError = new Error('stream error'); + const streamingHandler = vi.fn(async (_event, responseStream, _context) => { + // Simulate stream error by calling the error listener + const errorListener = (responseStream.on as any).mock.calls.find((call: any[]) => call[0] === 'error')?.[1]; + if (errorListener) { + errorListener(streamError); + } + return 'success'; + }); + (streamingHandler as any)[AWS_HANDLER_STREAMING_SYMBOL] = AWS_HANDLER_STREAMING_RESPONSE; + + const wrappedHandler = wrapHandler(streamingHandler); + await (wrappedHandler as any)(fakeEvent, mockResponseStream, fakeContext); + + expect(mockResponseStream.on).toHaveBeenCalledWith('error', expect.any(Function)); + expect(mockCaptureException).toHaveBeenCalledWith(streamError, expect.any(Function)); + expect(streamingHandler).toHaveBeenCalledWith(fakeEvent, mockResponseStream, fakeContext); + }); + + test('streaming handler with flushTimeout option', async () => { + expect.assertions(2); + + const streamingHandler = vi.fn(async (_event, _responseStream, _context) => { + return 'flushed'; + }); + (streamingHandler as any)[AWS_HANDLER_STREAMING_SYMBOL] = AWS_HANDLER_STREAMING_RESPONSE; + + const wrappedHandler = wrapHandler(streamingHandler, { flushTimeout: 5000 }); + const result = await (wrappedHandler as any)(fakeEvent, mockResponseStream, fakeContext); + + expect(result).toBe('flushed'); + expect(mockFlush).toBeCalledWith(5000); + }); + + test('streaming handler with captureTimeoutWarning enabled', async () => { + const streamingHandler = vi.fn(async (_event, _responseStream, _context) => { + // Simulate some delay to trigger timeout warning + await new Promise(resolve => setTimeout(resolve, DEFAULT_EXECUTION_TIME)); + return 'completed'; + }); + (streamingHandler as any)[AWS_HANDLER_STREAMING_SYMBOL] = AWS_HANDLER_STREAMING_RESPONSE; + + const wrappedHandler = wrapHandler(streamingHandler); + await (wrappedHandler as any)(fakeEvent, mockResponseStream, fakeContext); + + expect(mockWithScope).toBeCalledTimes(2); + expect(mockCaptureMessage).toBeCalled(); + expect(mockScope.setTag).toBeCalledWith('timeout', '1s'); + }); + + test('marks streaming handler captured errors as unhandled', async () => { + expect.assertions(3); + + const error = new Error('streaming error'); + const streamingHandler = vi.fn(async (_event, _responseStream, _context) => { + throw error; + }); + (streamingHandler as any)[AWS_HANDLER_STREAMING_SYMBOL] = AWS_HANDLER_STREAMING_RESPONSE; + + const wrappedHandler = wrapHandler(streamingHandler); + + try { + await (wrappedHandler as any)(fakeEvent, mockResponseStream, fakeContext); + } catch { + expect(mockCaptureException).toBeCalledWith(error, expect.any(Function)); + + const scopeFunction = mockCaptureException.mock.calls[0]?.[1]; + const event: Event = { exception: { values: [{}] } }; + let evtProcessor: ((e: Event) => Event) | undefined = undefined; + if (scopeFunction) { + scopeFunction({ addEventProcessor: vi.fn().mockImplementation(proc => (evtProcessor = proc)) }); + } + + expect(evtProcessor).toBeInstanceOf(Function); + // @ts-expect-error just mocking around... + expect(evtProcessor!(event).exception.values[0]?.mechanism).toEqual({ + handled: false, + type: 'auto.function.aws-serverless.handler', + }); + } + }); + + test('should not throw when flush rejects with streaming handler', async () => { + const streamingHandler = vi.fn(async (_event, _responseStream, _context) => { + return 'flush-error-test'; + }); + (streamingHandler as any)[AWS_HANDLER_STREAMING_SYMBOL] = AWS_HANDLER_STREAMING_RESPONSE; + + const wrappedHandler = wrapHandler(streamingHandler); + mockFlush.mockImplementationOnce(() => Promise.reject(new Error('flush failed'))); + + await expect((wrappedHandler as any)(fakeEvent, mockResponseStream, fakeContext)).resolves.toBe( + 'flush-error-test', + ); + }); + }); + test('marks the captured error as unhandled', async () => { expect.assertions(3); From 135dde71ed4fdbd7928850b6f07f959a52bd8f5c Mon Sep 17 00:00:00 2001 From: Martin Sonnberger Date: Tue, 26 Aug 2025 18:07:42 +0200 Subject: [PATCH 2/3] lint --- packages/aws-serverless/test/sdk.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/aws-serverless/test/sdk.test.ts b/packages/aws-serverless/test/sdk.test.ts index 25ed0411c3d0..3bf2c42b8fd2 100644 --- a/packages/aws-serverless/test/sdk.test.ts +++ b/packages/aws-serverless/test/sdk.test.ts @@ -2,7 +2,7 @@ import type { Event } from '@sentry/core'; import type { Callback, Handler } from 'aws-lambda'; import { beforeEach, describe, expect, test, vi } from 'vitest'; import { init } from '../src/init'; -import { AWS_HANDLER_STREAMING_SYMBOL, AWS_HANDLER_STREAMING_RESPONSE, wrapHandler } from '../src/sdk'; +import { AWS_HANDLER_STREAMING_RESPONSE, AWS_HANDLER_STREAMING_SYMBOL, wrapHandler } from '../src/sdk'; const mockFlush = vi.fn((...args) => Promise.resolve(args)); const mockWithScope = vi.fn(); From e2903bf5b548abd429da4321e43156993f1364e8 Mon Sep 17 00:00:00 2001 From: Martin Sonnberger Date: Wed, 27 Aug 2025 09:14:59 +0200 Subject: [PATCH 3/3] tweaks --- .../instrumentation.ts | 19 ++++++++++++------- packages/aws-serverless/src/sdk.ts | 2 +- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/packages/aws-serverless/src/integration/instrumentation-aws-lambda/instrumentation.ts b/packages/aws-serverless/src/integration/instrumentation-aws-lambda/instrumentation.ts index 94df0311f12e..1e51605c2afa 100644 --- a/packages/aws-serverless/src/integration/instrumentation-aws-lambda/instrumentation.ts +++ b/packages/aws-serverless/src/integration/instrumentation-aws-lambda/instrumentation.ts @@ -75,6 +75,7 @@ const headerGetter: TextMapGetter = { export const lambdaMaxInitInMilliseconds = 10_000; const AWS_HANDLER_STREAMING_SYMBOL = Symbol.for('aws.lambda.runtime.handler.streaming'); +const AWS_HANDLER_HIGHWATERMARK_SYMBOL = Symbol.for('aws.lambda.runtime.handler.streaming.highWaterMark'); const AWS_HANDLER_STREAMING_RESPONSE = 'response'; /** @@ -106,11 +107,14 @@ export class AwsLambdaInstrumentation extends InstrumentationBase { - // Add the streaming symbols that the instrumentation looks for + streamifyResponse: (handler: any, options: any) => { handler[AWS_HANDLER_STREAMING_SYMBOL] = AWS_HANDLER_STREAMING_RESPONSE; + if (typeof options?.highWaterMark === 'number') { + handler[AWS_HANDLER_HIGHWATERMARK_SYMBOL] = parseInt(options.highWaterMark); + } return handler; }, }; @@ -208,11 +212,12 @@ export class AwsLambdaInstrumentation extends InstrumentationBase)[symbol] = ( - original as unknown as Record - )[symbol]; - } + (patchedHandler as unknown as Record)[AWS_HANDLER_STREAMING_SYMBOL] = ( + original as unknown as Record + )[AWS_HANDLER_STREAMING_SYMBOL]; + (patchedHandler as unknown as Record)[AWS_HANDLER_HIGHWATERMARK_SYMBOL] = ( + original as unknown as Record + )[AWS_HANDLER_HIGHWATERMARK_SYMBOL]; return wrapHandler(patchedHandler) as T; } diff --git a/packages/aws-serverless/src/sdk.ts b/packages/aws-serverless/src/sdk.ts index 32da13d3cc00..fd647d3a3376 100644 --- a/packages/aws-serverless/src/sdk.ts +++ b/packages/aws-serverless/src/sdk.ts @@ -133,7 +133,7 @@ function setupTimeoutWatning(context: Context, options: WrapperOptions): NodeJS. return undefined; } -export const AWS_HANDLER_HIGHWATERMARK_SYMBOL = Symbol.for('aws.lambda.runtime.handler.highWaterMark'); +export const AWS_HANDLER_HIGHWATERMARK_SYMBOL = Symbol.for('aws.lambda.runtime.handler.streaming.highWaterMark'); export const AWS_HANDLER_STREAMING_SYMBOL = Symbol.for('aws.lambda.runtime.handler.streaming'); export const AWS_HANDLER_STREAMING_RESPONSE = 'response';