fix(dsm): move dsm plugin init to start from bindStart#7395
fix(dsm): move dsm plugin init to start from bindStart#7395robcarlan-datadog merged 23 commits intomasterfrom
Conversation
Overall package sizeSelf size: 4.69 MB Dependency sizes| name | version | self size | total size | |------|---------|-----------|------------| | import-in-the-middle | 2.0.6 | 81.92 kB | 816.75 kB | | dc-polyfill | 0.1.10 | 26.73 kB | 26.73 kB |🤖 This report was automatically generated by heaviest-objects-in-the-universe |
BenchmarksBenchmark execution time: 2026-02-19 20:16:27 Comparing candidate commit 2720a11 in PR branch Found 0 performance improvements and 0 performance regressions! Performance is the same for 226 metrics, 34 unstable metrics. |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #7395 +/- ##
==========================================
- Coverage 80.32% 80.30% -0.02%
==========================================
Files 733 733
Lines 31546 31560 +14
==========================================
+ Hits 25338 25343 +5
- Misses 6208 6217 +9 Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
FYI: Due to changes in |
50700ad to
df9f01e
Compare
08689a7 to
20b2e55
Compare
|
✅ Tests 🎉 All green!❄️ No new flaky tests detected 🔗 Commit SHA: 2720a11 | Docs | Datadog PR Page | Was this helpful? Give us feedback! |
|
Maybe we would be better off having some sort of bindStart static helper in the producer / consumer base classes? Something to signal that it should always happen at the end of bindStart? That would at least reduce the number of requires and hopefully ensure that the author of the next messaging plugin remembers to call the method. |
rochdev
left a comment
There was a problem hiding this comment.
Such a helper should not be needed if we use Diagnostics Channel properly. There should either be a completely separate store or the syncing should happen implicitly.
|
I marked it as draft while we do not have a conclusion. @rochdev @robcarlan-datadog what about having a call to discuss the implementation together? :) |
|
@BridgeAR thank you! |
…king When processing concurrent Kafka messages, the DSM context was being set via enterWith() on AsyncLocalStorage but not synced to ctx.currentStore. Since ctx.currentStore is what gets returned from bindStart and bound to async continuations via runStores, this caused DSM context to leak between concurrent message handlers. The fix syncs the DSM context to ctx.currentStore after DSM operations complete, ensuring each handler's async continuations maintain the correct DSM context. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add tests that verify DSM context is properly scoped to each handler's async continuations when using diagnostic channels with runStores. The tests verify that: 1. ctx.currentStore has dataStreamsContext after setDataStreamsContext 2. Concurrent handlers maintain isolated DSM contexts 3. DSM context persists through multiple async boundaries Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Add syncToStore helper in DSM context module that syncs DSM context from AsyncLocalStorage to ctx.currentStore after DSM operations. This fixes a race condition where DSM context was being set via enterWith() but not synced to ctx.currentStore, which is what gets bound to async continuations via store.run(). Without syncing, DSM context would leak between concurrent message handlers. Updated plugins: - kafkajs (consumer, producer) - amqplib (consumer, producer) - bullmq (consumer, producer) - rhea (consumer) - google-cloud-pubsub (consumer, producer) - aws-sdk (sqs, kinesis) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
The actual fix for the context propagation race condition is moving DSM logic from bindStart to start. The syncToStore workaround is unnecessary. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
5c06340 to
beea240
Compare
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…rrent consumers When two KafkaJS consumers process messages concurrently and each produces to a different topic, the DSM (Data Streams Monitoring) context leaks between them. The first consumer to process loses its DSM context entirely (null parent), while the second consumer picks up the first's context instead of its own. This test forces the interleaving by using promise gates to ensure both eachMessage handlers have fired before either produces, reliably reproducing the bug. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…mq, rhea, google-cloud-pubsub Same root cause as the kafkajs test: ctx.currentStore is set by startSpan before decodeDataStreamsContext/setCheckpoint call enterWith, so the DSM context is never included in the bound store for async continuations. Verified deterministic: 10/10 failures across all plugins and versions. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Rhea's producer DSM checkpoint happens in a separate encode hook, not in bindStart/start, so the current fix doesn't cover it. Will be addressed in a separate PR. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Moved DSM context fix and regression test for google-cloud-pubsub to #7582 to fix independently. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| if (!this.config.dsmEnabled) return | ||
| const { topic, messages, clusterId, disableHeaderInjection, currentStore: { span } } = ctx | ||
|
|
||
| for (const message of messages) { |
There was a problem hiding this comment.
It's a shame we need to iterate the message list twice now.
How many messages are usually in the list? 10? 100? more?
There was a problem hiding this comment.
It seems like the default is 16kB and the message count is how many messages fits into that limit?
There was a problem hiding this comment.
The expensive part of a loop is its content, not the loop itself, so I would say having better separation of concerns is better. In the future I'd even want DSM to have separate plugins entirely that are not loaded at all when DSM is disabled.
There was a problem hiding this comment.
Yeah, it really depends on the client configuration how many messages would be present. It at least exits early in the vast majority of cases where DSM isn't enabled
|
@codex review |
|
Codex Review: Didn't find any major issues. Hooray! ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
* fix(kafkajs): sync DSM context to currentStore to prevent context leaking When processing concurrent Kafka messages, the DSM context was being set via enterWith() on AsyncLocalStorage but not synced to ctx.currentStore. Since ctx.currentStore is what gets returned from bindStart and bound to async continuations via runStores, this caused DSM context to leak between concurrent message handlers. The fix syncs the DSM context to ctx.currentStore after DSM operations complete, ensuring each handler's async continuations maintain the correct DSM context. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * test(dsm): add regression test for context propagation race condition Add tests that verify DSM context is properly scoped to each handler's async continuations when using diagnostic channels with runStores. The tests verify that: 1. ctx.currentStore has dataStreamsContext after setDataStreamsContext 2. Concurrent handlers maintain isolated DSM contexts 3. DSM context persists through multiple async boundaries Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix(dsm): sync DSM context to currentStore to prevent context leaking Add syncToStore helper in DSM context module that syncs DSM context from AsyncLocalStorage to ctx.currentStore after DSM operations. This fixes a race condition where DSM context was being set via enterWith() but not synced to ctx.currentStore, which is what gets bound to async continuations via store.run(). Without syncing, DSM context would leak between concurrent message handlers. Updated plugins: - kafkajs (consumer, producer) - amqplib (consumer, producer) - bullmq (consumer, producer) - rhea (consumer) - google-cloud-pubsub (consumer, producer) - aws-sdk (sqs, kinesis) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * remove comment * chore: remove contrived regression test The unit test was useful for validating the fix during development but is contrived and doesn't add value as a permanent test. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * test(dsm): add unit tests for syncToStore and integration spy tests Add comprehensive tests for the new syncToStore helper: - Unit tests for syncToStore in context.spec.js covering normal operation, edge cases, and integration with setDataStreamsContext - Spy tests in 6 integration test files to verify syncToStore is called after produce and consume operations Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix(dsm): call syncToStore via module object for testability Change all plugins to call DataStreamsContext.syncToStore(ctx) instead of destructuring syncToStore at import time. This allows sinon spies to intercept calls during testing. When functions are destructured at require-time, they bind to the original function reference. Spies set up later on the module object don't affect these bindings. Calling via the module object ensures spies work correctly. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix(dsm): remove invalid producer-side syncToStore tests for AWS SDK The syncToStore fix only applies to the consumer path where bindStart is used. AWS SDK plugins (SQS, Kinesis) use requestInject for producers, which doesn't need context synchronization since: 1. requestInject is called before the request is sent 2. DSM context is encoded directly into the message 3. There's no async continuation where context leaking would occur Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * chore: remove unnecessary comments from test files Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix(dsm): move DSM logic from bindStart to start for correct context binding By moving DSM checkpoint/encoding logic into start() (which runs after the child context is bound), the DSM context naturally lands in the correct async store — eliminating the need for the syncToStore workaround in kafkajs, amqplib, bullmq, and rhea consumer plugins. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(dsm): remove syncToStore functionality The actual fix for the context propagation race condition is moving DSM logic from bindStart to start. The syncToStore workaround is unnecessary. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(dsm): move DSM logic from bindStart to start for google-cloud-pubsub Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * chore: remove extra blank lines in google-cloud-pubsub Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * chore: fix comma placement in amqplib producer setCheckpoint call Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * test(kafkajs): add regression test for DSM context leak between concurrent consumers When two KafkaJS consumers process messages concurrently and each produces to a different topic, the DSM (Data Streams Monitoring) context leaks between them. The first consumer to process loses its DSM context entirely (null parent), while the second consumer picks up the first's context instead of its own. This test forces the interleaving by using promise gates to ensure both eachMessage handlers have fired before either produces, reliably reproducing the bug. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * test(kafkajs): remove redundant comments Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * test(dsm): add regression tests for DSM context leak in amqplib, bullmq, rhea, google-cloud-pubsub Same root cause as the kafkajs test: ctx.currentStore is set by startSpan before decodeDataStreamsContext/setCheckpoint call enterWith, so the DSM context is never included in the bound store for async continuations. Verified deterministic: 10/10 failures across all plugins and versions. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * test(dsm): fix lint and remove redundant comments Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * test(rhea): use const for senderAOut Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * test(rhea): remove rhea DSM regression test Rhea's producer DSM checkpoint happens in a separate encode hook, not in bindStart/start, so the current fix doesn't cover it. Will be addressed in a separate PR. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(rhea): remove rhea consumer changes, tracked separately in #7581 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * test(dsm): move google-cloud-pubsub changes to separate PR Moved DSM context fix and regression test for google-cloud-pubsub to #7582 to fix independently. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
* fix(kafkajs): sync DSM context to currentStore to prevent context leaking When processing concurrent Kafka messages, the DSM context was being set via enterWith() on AsyncLocalStorage but not synced to ctx.currentStore. Since ctx.currentStore is what gets returned from bindStart and bound to async continuations via runStores, this caused DSM context to leak between concurrent message handlers. The fix syncs the DSM context to ctx.currentStore after DSM operations complete, ensuring each handler's async continuations maintain the correct DSM context. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * test(dsm): add regression test for context propagation race condition Add tests that verify DSM context is properly scoped to each handler's async continuations when using diagnostic channels with runStores. The tests verify that: 1. ctx.currentStore has dataStreamsContext after setDataStreamsContext 2. Concurrent handlers maintain isolated DSM contexts 3. DSM context persists through multiple async boundaries Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix(dsm): sync DSM context to currentStore to prevent context leaking Add syncToStore helper in DSM context module that syncs DSM context from AsyncLocalStorage to ctx.currentStore after DSM operations. This fixes a race condition where DSM context was being set via enterWith() but not synced to ctx.currentStore, which is what gets bound to async continuations via store.run(). Without syncing, DSM context would leak between concurrent message handlers. Updated plugins: - kafkajs (consumer, producer) - amqplib (consumer, producer) - bullmq (consumer, producer) - rhea (consumer) - google-cloud-pubsub (consumer, producer) - aws-sdk (sqs, kinesis) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * remove comment * chore: remove contrived regression test The unit test was useful for validating the fix during development but is contrived and doesn't add value as a permanent test. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * test(dsm): add unit tests for syncToStore and integration spy tests Add comprehensive tests for the new syncToStore helper: - Unit tests for syncToStore in context.spec.js covering normal operation, edge cases, and integration with setDataStreamsContext - Spy tests in 6 integration test files to verify syncToStore is called after produce and consume operations Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix(dsm): call syncToStore via module object for testability Change all plugins to call DataStreamsContext.syncToStore(ctx) instead of destructuring syncToStore at import time. This allows sinon spies to intercept calls during testing. When functions are destructured at require-time, they bind to the original function reference. Spies set up later on the module object don't affect these bindings. Calling via the module object ensures spies work correctly. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix(dsm): remove invalid producer-side syncToStore tests for AWS SDK The syncToStore fix only applies to the consumer path where bindStart is used. AWS SDK plugins (SQS, Kinesis) use requestInject for producers, which doesn't need context synchronization since: 1. requestInject is called before the request is sent 2. DSM context is encoded directly into the message 3. There's no async continuation where context leaking would occur Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * chore: remove unnecessary comments from test files Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix(dsm): move DSM logic from bindStart to start for correct context binding By moving DSM checkpoint/encoding logic into start() (which runs after the child context is bound), the DSM context naturally lands in the correct async store — eliminating the need for the syncToStore workaround in kafkajs, amqplib, bullmq, and rhea consumer plugins. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(dsm): remove syncToStore functionality The actual fix for the context propagation race condition is moving DSM logic from bindStart to start. The syncToStore workaround is unnecessary. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(dsm): move DSM logic from bindStart to start for google-cloud-pubsub Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * chore: remove extra blank lines in google-cloud-pubsub Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * chore: fix comma placement in amqplib producer setCheckpoint call Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * test(kafkajs): add regression test for DSM context leak between concurrent consumers When two KafkaJS consumers process messages concurrently and each produces to a different topic, the DSM (Data Streams Monitoring) context leaks between them. The first consumer to process loses its DSM context entirely (null parent), while the second consumer picks up the first's context instead of its own. This test forces the interleaving by using promise gates to ensure both eachMessage handlers have fired before either produces, reliably reproducing the bug. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * test(kafkajs): remove redundant comments Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * test(dsm): add regression tests for DSM context leak in amqplib, bullmq, rhea, google-cloud-pubsub Same root cause as the kafkajs test: ctx.currentStore is set by startSpan before decodeDataStreamsContext/setCheckpoint call enterWith, so the DSM context is never included in the bound store for async continuations. Verified deterministic: 10/10 failures across all plugins and versions. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * test(dsm): fix lint and remove redundant comments Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * test(rhea): use const for senderAOut Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * test(rhea): remove rhea DSM regression test Rhea's producer DSM checkpoint happens in a separate encode hook, not in bindStart/start, so the current fix doesn't cover it. Will be addressed in a separate PR. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(rhea): remove rhea consumer changes, tracked separately in #7581 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * test(dsm): move google-cloud-pubsub changes to separate PR Moved DSM context fix and regression test for google-cloud-pubsub to #7582 to fix independently. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Summary
start, which runs inside the async context thatbindStartestablishes. Setting it inbindStartitself caused race conditions where the DSM context could leak across concurrent operations. This can cause DSM to read context from incorrect operations, leading to incorrect pathways and latencies.bindStarttostartin messaging plugins (kafkajs, amqplib, bullmq), which also fixes Confluent Kafka because that extends kafkajs.Test plan
🤖 Generated with Claude Code