Skip to content

fix(rhea): DSM context leak between concurrent consumers#7581

Draft
robcarlan-datadog wants to merge 2 commits intomasterfrom
rcarlan/dsm-rhea-context-leak
Draft

fix(rhea): DSM context leak between concurrent consumers#7581
robcarlan-datadog wants to merge 2 commits intomasterfrom
rcarlan/dsm-rhea-context-leak

Conversation

@robcarlan-datadog
Copy link
Copy Markdown
Contributor

Summary

  • Adds a regression test for the DSM context leak bug in rhea
  • Rhea's producer is different from the other plugins: the DSM checkpoint happens in a separate encode hook (addTraceSub('encode', ...)), not in bindStart or start
  • The fix applied to kafkajs/amqplib/bullmq/google-cloud-pubsub (moving DSM to start) doesn't cover rhea's producer path

Root Cause

Same underlying issue as #7395ctx.currentStore doesn't include DSM context. But the producer fix needs a different approach since the checkpoint happens in encode, which runs inside the producer's store.run scope where the stale store is active.

Test plan

  • Regression test fails deterministically (10/10 runs, rhea 1.0.0 and 3.0.4)
  • Fix TBD

🤖 Generated with Claude Code

…nt consumers

Rhea's producer DSM checkpoint happens in a separate encode hook (not
bindStart), so the fix applied to other plugins doesn't cover it. This
test verifies the bug exists and will guide the fix.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Feb 19, 2026

Overall package size

Self size: 4.69 MB
Deduped: 5.53 MB
No deduping: 5.53 MB

Dependency sizes | name | version | self size | total size | |------|---------|-----------|------------| | import-in-the-middle | 2.0.6 | 81.92 kB | 816.75 kB | | dc-polyfill | 0.1.10 | 26.73 kB | 26.73 kB |

🤖 This report was automatically generated by heaviest-objects-in-the-universe

Same fix as the other plugins — DSM context must be set in start()
(which runs after the store is bound) rather than bindStart() (where
enterWith calls leak to the outer context).

Note: the producer side still needs fixing since its DSM checkpoint
happens in a separate encode hook.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@datadog-datadog-prod-us1
Copy link
Copy Markdown

datadog-datadog-prod-us1 Bot commented Feb 19, 2026

⚠️ Tests

Fix all issues with BitsAI or with Cursor

⚠️ Warnings

❄️ 1 New flaky test detected

RASP - downstream request integration Downstream configuration with zero max count limit skips downstream analysis when limit is zero from with zero max count limit (Datadog) (Fix with Cursor)
timeout, additionally:
Error: Telemetry namespace is not appsec
    at /Users/runner/work/dd-trace-js/dd-trace-js/packages/dd-trace/test/appsec/downstream_requests.integration.spec.js:98:15
    at FakeAgent.messageHandler (/Users/runner/work/dd-trace-js/dd-trace-js/integration-tests/helpers/fake-agent.js:261:9)
    at FakeAgent.emit (node:events:508:28)
    at /Users/runner/work/dd-trace-js/dd-trace-js/integration-tests/helpers/fake-agent.js:484:11
    at Layer.handleRequest (/Users/runner/work/dd-trace-js/dd-trace-js/node_modules/router/lib/layer.js:152:17)
    at next (/Users/runner/work/dd-trace-js/dd-trace-js/node_modules/router/lib/route.js:157:13)
    at Route.dispatch (/Users/runner/work/dd-trace-js/dd-trace-js/node_modules/router/lib/route.js:117:3)
    at handle (/Users/runner/work/dd-trace-js/dd-trace-js/node_modules/router/index.js:435:11)
...

ℹ️ Info

🧪 All tests passed

This comment will be updated automatically if new data arrives.
🔗 Commit SHA: 2ac4c92 | Docs | Datadog PR Page | Was this helpful? Give us feedback!

robcarlan-datadog added a commit that referenced this pull request Feb 19, 2026
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@pr-commenter
Copy link
Copy Markdown

pr-commenter Bot commented Feb 19, 2026

Benchmarks

Benchmark execution time: 2026-02-19 19:14:29

Comparing candidate commit 2ac4c92 in PR branch rcarlan/dsm-rhea-context-leak with baseline commit 495b56a in branch master.

Found 0 performance improvements and 0 performance regressions! Performance is the same for 229 metrics, 31 unstable metrics.

@codecov
Copy link
Copy Markdown

codecov Bot commented Feb 19, 2026

Codecov Report

❌ Patch coverage is 0% with 9 lines in your changes missing coverage. Please review.
✅ Project coverage is 80.19%. Comparing base (495b56a) to head (2ac4c92).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
packages/datadog-plugin-rhea/src/consumer.js 0.00% 9 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #7581      +/-   ##
==========================================
- Coverage   80.29%   80.19%   -0.10%     
==========================================
  Files         732      731       -1     
  Lines       31468    31379      -89     
==========================================
- Hits        25267    25165     -102     
- Misses       6201     6214      +13     
Flag Coverage Δ
aiguard-macos 38.93% <ø> (-0.18%) ⬇️
aiguard-ubuntu 39.05% <ø> (-0.18%) ⬇️
aiguard-windows 38.79% <ø> (-0.18%) ⬇️
apm-capabilities-tracing-macos 48.62% <0.00%> (+0.23%) ⬆️
apm-capabilities-tracing-ubuntu 48.65% <0.00%> (+0.23%) ⬆️
apm-capabilities-tracing-windows 48.35% <0.00%> (+0.22%) ⬆️
apm-integrations-child-process 38.50% <ø> (-0.18%) ⬇️
apm-integrations-couchbase-18 37.27% <ø> (-0.31%) ⬇️
apm-integrations-couchbase-eol 37.90% <ø> (-0.02%) ⬇️
apm-integrations-oracledb 37.73% <ø> (-0.18%) ⬇️
appsec-express 55.53% <ø> (-0.21%) ⬇️
appsec-fastify 51.84% <ø> (-0.21%) ⬇️
appsec-graphql 52.03% <ø> (-0.19%) ⬇️
appsec-kafka 44.48% <ø> (-0.18%) ⬇️
appsec-ldapjs 44.09% <ø> (-0.18%) ⬇️
appsec-lodash 43.77% <ø> (-0.17%) ⬇️
appsec-macos ?
appsec-mongodb-core 48.84% <ø> (-0.21%) ⬇️
appsec-mongoose 49.64% <ø> (-0.21%) ⬇️
appsec-mysql 51.01% <ø> (-0.21%) ⬇️
appsec-node-serialize 43.29% <ø> (-0.17%) ⬇️
appsec-passport 47.78% <ø> (-0.21%) ⬇️
appsec-postgres 50.77% <ø> (-0.21%) ⬇️
appsec-sourcing 42.64% <ø> (-0.17%) ⬇️
appsec-template 43.46% <ø> (-0.17%) ⬇️
appsec-ubuntu 58.68% <ø> (-0.21%) ⬇️
appsec-windows 58.46% <ø> (-0.23%) ⬇️
instrumentations-instrumentation-bluebird 32.20% <ø> (-0.19%) ⬇️
instrumentations-instrumentation-body-parser 40.51% <ø> (-0.18%) ⬇️
instrumentations-instrumentation-child_process 37.82% <ø> (-0.17%) ⬇️
instrumentations-instrumentation-cookie-parser 34.24% <ø> (-0.18%) ⬇️
instrumentations-instrumentation-express 34.58% <ø> (-0.19%) ⬇️
instrumentations-instrumentation-express-mongo-sanitize 34.37% <ø> (-0.19%) ⬇️
instrumentations-instrumentation-express-session 40.13% <ø> (-0.18%) ⬇️
instrumentations-instrumentation-fs 31.80% <ø> (-0.19%) ⬇️
instrumentations-instrumentation-generic-pool 29.76% <ø> (ø)
instrumentations-instrumentation-http 39.85% <ø> (-0.18%) ⬇️
instrumentations-instrumentation-knex 32.20% <ø> (-0.19%) ⬇️
instrumentations-instrumentation-mongoose 33.37% <ø> (-0.19%) ⬇️
instrumentations-instrumentation-multer 40.25% <ø> (-0.18%) ⬇️
instrumentations-instrumentation-mysql2 38.29% <ø> (-0.19%) ⬇️
instrumentations-instrumentation-passport 44.09% <ø> (-0.19%) ⬇️
instrumentations-instrumentation-passport-http 43.76% <ø> (-0.18%) ⬇️
instrumentations-instrumentation-passport-local 44.30% <ø> (-0.19%) ⬇️
instrumentations-instrumentation-pg 37.71% <ø> (-0.19%) ⬇️
instrumentations-instrumentation-promise 32.13% <ø> (-0.19%) ⬇️
instrumentations-instrumentation-promise-js 32.13% <ø> (-0.19%) ⬇️
instrumentations-instrumentation-q 32.18% <ø> (-0.19%) ⬇️
instrumentations-instrumentation-url 32.10% <ø> (-0.19%) ⬇️
instrumentations-instrumentation-when 32.15% <ø> (-0.19%) ⬇️
llmobs-ai 41.33% <ø> (-0.18%) ⬇️
llmobs-anthropic 40.32% <ø> (-0.18%) ⬇️
llmobs-bedrock 39.25% <ø> (-0.15%) ⬇️
llmobs-google-genai 39.84% <ø> (-0.16%) ⬇️
llmobs-langchain 39.43% <ø> (-0.14%) ⬇️
llmobs-openai 44.14% <ø> (-0.19%) ⬇️
llmobs-vertex-ai 40.11% <ø> (-0.10%) ⬇️
platform-core 29.71% <ø> (ø)
platform-esbuild 32.89% <ø> (ø)
platform-instrumentations-misc 40.53% <ø> (ø)
platform-shimmer 36.14% <ø> (ø)
platform-unit-guardrails 31.27% <ø> (ø)
plugins-azure-event-hubs 24.02% <ø> (ø)
plugins-azure-service-bus ?
plugins-bullmq 43.66% <ø> (-0.15%) ⬇️
plugins-cassandra 37.77% <ø> (-0.18%) ⬇️
plugins-cookie 25.08% <ø> (ø)
plugins-cookie-parser 24.87% <ø> (ø)
plugins-crypto 24.72% <ø> (ø)
plugins-dd-trace-api 38.36% <ø> (-0.18%) ⬇️
plugins-express-mongo-sanitize 25.04% <ø> (ø)
plugins-express-session 24.83% <ø> (ø)
plugins-fastify 42.27% <ø> (-0.19%) ⬇️
plugins-fetch 38.32% <ø> (-0.17%) ⬇️
plugins-fs 38.61% <ø> (-0.18%) ⬇️
plugins-generic-pool 24.06% <ø> (ø)
plugins-google-cloud-pubsub 45.46% <ø> (-0.14%) ⬇️
plugins-grpc 40.97% <ø> (-0.18%) ⬇️
plugins-handlebars 25.08% <ø> (ø)
plugins-hapi 40.14% <ø> (-0.18%) ⬇️
plugins-hono 40.41% <ø> (-0.18%) ⬇️
plugins-ioredis 38.41% <ø> (-0.18%) ⬇️
plugins-knex 24.80% <ø> (ø)
plugins-ldapjs 22.61% <ø> (ø)
plugins-light-my-request 24.48% <ø> (ø)
plugins-limitd-client 32.50% <ø> (-0.19%) ⬇️
plugins-lodash 24.13% <ø> (ø)
plugins-mariadb 39.49% <ø> (-0.20%) ⬇️
plugins-memcached 38.15% <ø> (-0.18%) ⬇️
plugins-microgateway-core 39.17% <ø> (-0.18%) ⬇️
plugins-moleculer 40.53% <ø> (-0.18%) ⬇️
plugins-mongodb 39.20% <ø> (-0.16%) ⬇️
plugins-mongodb-core 39.03% <ø> (-0.17%) ⬇️
plugins-mongoose 38.88% <ø> (-0.16%) ⬇️
plugins-multer 24.83% <ø> (ø)
plugins-mysql 39.17% <ø> (-0.14%) ⬇️
plugins-mysql2 39.27% <ø> (-0.17%) ⬇️
plugins-node-serialize 25.12% <ø> (ø)
plugins-opensearch 37.60% <ø> (-0.18%) ⬇️
plugins-passport-http 24.91% <ø> (ø)
plugins-postgres 35.69% <ø> (-0.12%) ⬇️
plugins-process 24.72% <ø> (ø)
plugins-pug 25.08% <ø> (ø)
plugins-redis 38.89% <ø> (-0.18%) ⬇️
plugins-router 43.03% <ø> (-0.20%) ⬇️
plugins-sequelize 23.66% <ø> (ø)
plugins-test-and-upstream-amqp10 38.48% <ø> (-0.18%) ⬇️
plugins-test-and-upstream-amqplib 43.85% <ø> (-0.16%) ⬇️
plugins-test-and-upstream-apollo 39.03% <ø> (-0.16%) ⬇️
plugins-test-and-upstream-avsc 38.70% <ø> (-0.23%) ⬇️
plugins-test-and-upstream-bunyan 33.79% <ø> (-0.20%) ⬇️
plugins-test-and-upstream-connect 40.81% <ø> (-0.19%) ⬇️
plugins-test-and-upstream-graphql 40.15% <ø> (-0.18%) ⬇️
plugins-test-and-upstream-koa 40.39% <ø> (-0.18%) ⬇️
plugins-test-and-upstream-protobufjs 38.93% <ø> (-0.23%) ⬇️
plugins-test-and-upstream-rhea ?
plugins-undici 39.11% <ø> (-0.17%) ⬇️
plugins-url 24.72% <ø> (ø)
plugins-valkey 38.07% <ø> (-0.18%) ⬇️
plugins-vm 24.72% <ø> (ø)
plugins-winston 33.99% <ø> (-0.19%) ⬇️
plugins-ws 41.91% <ø> (-0.19%) ⬇️
profiling-macos 39.84% <ø> (-0.24%) ⬇️
profiling-ubuntu 39.97% <ø> (-0.24%) ⬇️
profiling-windows 41.20% <ø> (-0.24%) ⬇️
serverless-azure-functions-client 23.75% <ø> (ø)
serverless-azure-functions-eventhubs 23.75% <ø> (ø)
serverless-azure-functions-servicebus 23.75% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

robcarlan-datadog added a commit that referenced this pull request Feb 23, 2026
* fix(kafkajs): sync DSM context to currentStore to prevent context leaking

When processing concurrent Kafka messages, the DSM context was being set
via enterWith() on AsyncLocalStorage but not synced to ctx.currentStore.
Since ctx.currentStore is what gets returned from bindStart and bound to
async continuations via runStores, this caused DSM context to leak between
concurrent message handlers.

The fix syncs the DSM context to ctx.currentStore after DSM operations
complete, ensuring each handler's async continuations maintain the correct
DSM context.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* test(dsm): add regression test for context propagation race condition

Add tests that verify DSM context is properly scoped to each handler's
async continuations when using diagnostic channels with runStores.

The tests verify that:
1. ctx.currentStore has dataStreamsContext after setDataStreamsContext
2. Concurrent handlers maintain isolated DSM contexts
3. DSM context persists through multiple async boundaries

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(dsm): sync DSM context to currentStore to prevent context leaking

Add syncToStore helper in DSM context module that syncs DSM context
from AsyncLocalStorage to ctx.currentStore after DSM operations.

This fixes a race condition where DSM context was being set via
enterWith() but not synced to ctx.currentStore, which is what gets
bound to async continuations via store.run(). Without syncing, DSM
context would leak between concurrent message handlers.

Updated plugins:
- kafkajs (consumer, producer)
- amqplib (consumer, producer)
- bullmq (consumer, producer)
- rhea (consumer)
- google-cloud-pubsub (consumer, producer)
- aws-sdk (sqs, kinesis)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* remove comment

* chore: remove contrived regression test

The unit test was useful for validating the fix during development
but is contrived and doesn't add value as a permanent test.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* test(dsm): add unit tests for syncToStore and integration spy tests

Add comprehensive tests for the new syncToStore helper:
- Unit tests for syncToStore in context.spec.js covering normal
  operation, edge cases, and integration with setDataStreamsContext
- Spy tests in 6 integration test files to verify syncToStore is
  called after produce and consume operations

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(dsm): call syncToStore via module object for testability

Change all plugins to call DataStreamsContext.syncToStore(ctx)
instead of destructuring syncToStore at import time. This allows
sinon spies to intercept calls during testing.

When functions are destructured at require-time, they bind to the
original function reference. Spies set up later on the module
object don't affect these bindings. Calling via the module object
ensures spies work correctly.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(dsm): remove invalid producer-side syncToStore tests for AWS SDK

The syncToStore fix only applies to the consumer path where bindStart
is used. AWS SDK plugins (SQS, Kinesis) use requestInject for producers,
which doesn't need context synchronization since:

1. requestInject is called before the request is sent
2. DSM context is encoded directly into the message
3. There's no async continuation where context leaking would occur

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* chore: remove unnecessary comments from test files

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(dsm): move DSM logic from bindStart to start for correct context binding

By moving DSM checkpoint/encoding logic into start() (which runs after
the child context is bound), the DSM context naturally lands in the
correct async store — eliminating the need for the syncToStore workaround
in kafkajs, amqplib, bullmq, and rhea consumer plugins.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(dsm): remove syncToStore functionality

The actual fix for the context propagation race condition is moving DSM
logic from bindStart to start. The syncToStore workaround is unnecessary.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(dsm): move DSM logic from bindStart to start for google-cloud-pubsub

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* chore: remove extra blank lines in google-cloud-pubsub

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* chore: fix comma placement in amqplib producer setCheckpoint call

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(kafkajs): add regression test for DSM context leak between concurrent consumers

When two KafkaJS consumers process messages concurrently and each
produces to a different topic, the DSM (Data Streams Monitoring) context
leaks between them. The first consumer to process loses its DSM context
entirely (null parent), while the second consumer picks up the first's
context instead of its own.

This test forces the interleaving by using promise gates to ensure both
eachMessage handlers have fired before either produces, reliably
reproducing the bug.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(kafkajs): remove redundant comments

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(dsm): add regression tests for DSM context leak in amqplib, bullmq, rhea, google-cloud-pubsub

Same root cause as the kafkajs test: ctx.currentStore is set by
startSpan before decodeDataStreamsContext/setCheckpoint call enterWith,
so the DSM context is never included in the bound store for async
continuations.

Verified deterministic: 10/10 failures across all plugins and versions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(dsm): fix lint and remove redundant comments

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(rhea): use const for senderAOut

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(rhea): remove rhea DSM regression test

Rhea's producer DSM checkpoint happens in a separate encode hook, not
in bindStart/start, so the current fix doesn't cover it. Will be
addressed in a separate PR.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(rhea): remove rhea consumer changes, tracked separately in #7581

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(dsm): move google-cloud-pubsub changes to separate PR

Moved DSM context fix and regression test for google-cloud-pubsub
to #7582 to fix independently.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
dd-octo-sts Bot pushed a commit that referenced this pull request Feb 24, 2026
* fix(kafkajs): sync DSM context to currentStore to prevent context leaking

When processing concurrent Kafka messages, the DSM context was being set
via enterWith() on AsyncLocalStorage but not synced to ctx.currentStore.
Since ctx.currentStore is what gets returned from bindStart and bound to
async continuations via runStores, this caused DSM context to leak between
concurrent message handlers.

The fix syncs the DSM context to ctx.currentStore after DSM operations
complete, ensuring each handler's async continuations maintain the correct
DSM context.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* test(dsm): add regression test for context propagation race condition

Add tests that verify DSM context is properly scoped to each handler's
async continuations when using diagnostic channels with runStores.

The tests verify that:
1. ctx.currentStore has dataStreamsContext after setDataStreamsContext
2. Concurrent handlers maintain isolated DSM contexts
3. DSM context persists through multiple async boundaries

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(dsm): sync DSM context to currentStore to prevent context leaking

Add syncToStore helper in DSM context module that syncs DSM context
from AsyncLocalStorage to ctx.currentStore after DSM operations.

This fixes a race condition where DSM context was being set via
enterWith() but not synced to ctx.currentStore, which is what gets
bound to async continuations via store.run(). Without syncing, DSM
context would leak between concurrent message handlers.

Updated plugins:
- kafkajs (consumer, producer)
- amqplib (consumer, producer)
- bullmq (consumer, producer)
- rhea (consumer)
- google-cloud-pubsub (consumer, producer)
- aws-sdk (sqs, kinesis)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* remove comment

* chore: remove contrived regression test

The unit test was useful for validating the fix during development
but is contrived and doesn't add value as a permanent test.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* test(dsm): add unit tests for syncToStore and integration spy tests

Add comprehensive tests for the new syncToStore helper:
- Unit tests for syncToStore in context.spec.js covering normal
  operation, edge cases, and integration with setDataStreamsContext
- Spy tests in 6 integration test files to verify syncToStore is
  called after produce and consume operations

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(dsm): call syncToStore via module object for testability

Change all plugins to call DataStreamsContext.syncToStore(ctx)
instead of destructuring syncToStore at import time. This allows
sinon spies to intercept calls during testing.

When functions are destructured at require-time, they bind to the
original function reference. Spies set up later on the module
object don't affect these bindings. Calling via the module object
ensures spies work correctly.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(dsm): remove invalid producer-side syncToStore tests for AWS SDK

The syncToStore fix only applies to the consumer path where bindStart
is used. AWS SDK plugins (SQS, Kinesis) use requestInject for producers,
which doesn't need context synchronization since:

1. requestInject is called before the request is sent
2. DSM context is encoded directly into the message
3. There's no async continuation where context leaking would occur

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* chore: remove unnecessary comments from test files

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(dsm): move DSM logic from bindStart to start for correct context binding

By moving DSM checkpoint/encoding logic into start() (which runs after
the child context is bound), the DSM context naturally lands in the
correct async store — eliminating the need for the syncToStore workaround
in kafkajs, amqplib, bullmq, and rhea consumer plugins.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(dsm): remove syncToStore functionality

The actual fix for the context propagation race condition is moving DSM
logic from bindStart to start. The syncToStore workaround is unnecessary.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(dsm): move DSM logic from bindStart to start for google-cloud-pubsub

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* chore: remove extra blank lines in google-cloud-pubsub

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* chore: fix comma placement in amqplib producer setCheckpoint call

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(kafkajs): add regression test for DSM context leak between concurrent consumers

When two KafkaJS consumers process messages concurrently and each
produces to a different topic, the DSM (Data Streams Monitoring) context
leaks between them. The first consumer to process loses its DSM context
entirely (null parent), while the second consumer picks up the first's
context instead of its own.

This test forces the interleaving by using promise gates to ensure both
eachMessage handlers have fired before either produces, reliably
reproducing the bug.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(kafkajs): remove redundant comments

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(dsm): add regression tests for DSM context leak in amqplib, bullmq, rhea, google-cloud-pubsub

Same root cause as the kafkajs test: ctx.currentStore is set by
startSpan before decodeDataStreamsContext/setCheckpoint call enterWith,
so the DSM context is never included in the bound store for async
continuations.

Verified deterministic: 10/10 failures across all plugins and versions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(dsm): fix lint and remove redundant comments

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(rhea): use const for senderAOut

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(rhea): remove rhea DSM regression test

Rhea's producer DSM checkpoint happens in a separate encode hook, not
in bindStart/start, so the current fix doesn't cover it. Will be
addressed in a separate PR.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(rhea): remove rhea consumer changes, tracked separately in #7581

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(dsm): move google-cloud-pubsub changes to separate PR

Moved DSM context fix and regression test for google-cloud-pubsub
to #7582 to fix independently.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
juan-fernandez pushed a commit that referenced this pull request Mar 5, 2026
* fix(kafkajs): sync DSM context to currentStore to prevent context leaking

When processing concurrent Kafka messages, the DSM context was being set
via enterWith() on AsyncLocalStorage but not synced to ctx.currentStore.
Since ctx.currentStore is what gets returned from bindStart and bound to
async continuations via runStores, this caused DSM context to leak between
concurrent message handlers.

The fix syncs the DSM context to ctx.currentStore after DSM operations
complete, ensuring each handler's async continuations maintain the correct
DSM context.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* test(dsm): add regression test for context propagation race condition

Add tests that verify DSM context is properly scoped to each handler's
async continuations when using diagnostic channels with runStores.

The tests verify that:
1. ctx.currentStore has dataStreamsContext after setDataStreamsContext
2. Concurrent handlers maintain isolated DSM contexts
3. DSM context persists through multiple async boundaries

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(dsm): sync DSM context to currentStore to prevent context leaking

Add syncToStore helper in DSM context module that syncs DSM context
from AsyncLocalStorage to ctx.currentStore after DSM operations.

This fixes a race condition where DSM context was being set via
enterWith() but not synced to ctx.currentStore, which is what gets
bound to async continuations via store.run(). Without syncing, DSM
context would leak between concurrent message handlers.

Updated plugins:
- kafkajs (consumer, producer)
- amqplib (consumer, producer)
- bullmq (consumer, producer)
- rhea (consumer)
- google-cloud-pubsub (consumer, producer)
- aws-sdk (sqs, kinesis)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* remove comment

* chore: remove contrived regression test

The unit test was useful for validating the fix during development
but is contrived and doesn't add value as a permanent test.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* test(dsm): add unit tests for syncToStore and integration spy tests

Add comprehensive tests for the new syncToStore helper:
- Unit tests for syncToStore in context.spec.js covering normal
  operation, edge cases, and integration with setDataStreamsContext
- Spy tests in 6 integration test files to verify syncToStore is
  called after produce and consume operations

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(dsm): call syncToStore via module object for testability

Change all plugins to call DataStreamsContext.syncToStore(ctx)
instead of destructuring syncToStore at import time. This allows
sinon spies to intercept calls during testing.

When functions are destructured at require-time, they bind to the
original function reference. Spies set up later on the module
object don't affect these bindings. Calling via the module object
ensures spies work correctly.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(dsm): remove invalid producer-side syncToStore tests for AWS SDK

The syncToStore fix only applies to the consumer path where bindStart
is used. AWS SDK plugins (SQS, Kinesis) use requestInject for producers,
which doesn't need context synchronization since:

1. requestInject is called before the request is sent
2. DSM context is encoded directly into the message
3. There's no async continuation where context leaking would occur

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* chore: remove unnecessary comments from test files

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(dsm): move DSM logic from bindStart to start for correct context binding

By moving DSM checkpoint/encoding logic into start() (which runs after
the child context is bound), the DSM context naturally lands in the
correct async store — eliminating the need for the syncToStore workaround
in kafkajs, amqplib, bullmq, and rhea consumer plugins.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(dsm): remove syncToStore functionality

The actual fix for the context propagation race condition is moving DSM
logic from bindStart to start. The syncToStore workaround is unnecessary.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(dsm): move DSM logic from bindStart to start for google-cloud-pubsub

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* chore: remove extra blank lines in google-cloud-pubsub

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* chore: fix comma placement in amqplib producer setCheckpoint call

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(kafkajs): add regression test for DSM context leak between concurrent consumers

When two KafkaJS consumers process messages concurrently and each
produces to a different topic, the DSM (Data Streams Monitoring) context
leaks between them. The first consumer to process loses its DSM context
entirely (null parent), while the second consumer picks up the first's
context instead of its own.

This test forces the interleaving by using promise gates to ensure both
eachMessage handlers have fired before either produces, reliably
reproducing the bug.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(kafkajs): remove redundant comments

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(dsm): add regression tests for DSM context leak in amqplib, bullmq, rhea, google-cloud-pubsub

Same root cause as the kafkajs test: ctx.currentStore is set by
startSpan before decodeDataStreamsContext/setCheckpoint call enterWith,
so the DSM context is never included in the bound store for async
continuations.

Verified deterministic: 10/10 failures across all plugins and versions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(dsm): fix lint and remove redundant comments

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(rhea): use const for senderAOut

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(rhea): remove rhea DSM regression test

Rhea's producer DSM checkpoint happens in a separate encode hook, not
in bindStart/start, so the current fix doesn't cover it. Will be
addressed in a separate PR.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(rhea): remove rhea consumer changes, tracked separately in #7581

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* test(dsm): move google-cloud-pubsub changes to separate PR

Moved DSM context fix and regression test for google-cloud-pubsub
to #7582 to fix independently.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant