Skip to content

Commit 631fb6a

Browse files
tlhunterclaude
andauthored
feat(dsm,dbm): add process tags support for enhanced trace correlation (#7212)
Add propagation hash support to Data Streams Monitoring (DSM) and Database Monitoring (DBM) to enable correlation between traces, database operations, and data stream pathways using process and container metadata. The propagation hash is an FNV-1a 64-bit hash combining process tags (entrypoint info, package.json name) with container tags received from the Datadog agent. This hash is included in DSM pathway computations and DBM SQL comments to enable enhanced observability and trace correlation. Key changes: - Create propagation-hash module for FNV-1a hash computation with caching - Update DSM pathway hash computation to include propagation hash - Add ProcessTags field to DSM payloads sent to agent - Add ddsh parameter to DBM SQL comments containing propagation hash - Capture container tags from agent response headers - Feature is opt-in via DD_EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED This aligns with similar implementations in dd-trace-py (#15356), dd-trace-java (#9282), and dd-trace-rb (#5208, #5209). Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 495b56a commit 631fb6a

15 files changed

Lines changed: 802 additions & 18 deletions

File tree

index.d.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,6 +768,19 @@ declare namespace tracer {
768768
*/
769769
dsmEnabled?: boolean
770770

771+
/**
772+
* Configuration for Database Monitoring (DBM).
773+
*/
774+
dbm?: {
775+
/**
776+
* Controls whether to inject the SQL base hash (propagation hash) in DBM SQL comments.
777+
* This option requires DD_EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED=true to take effect.
778+
* The propagation hash enables correlation between traces and database operations.
779+
* @default false
780+
*/
781+
injectSqlBaseHash?: boolean
782+
}
783+
771784
/**
772785
* Configuration of the AppSec protection. Can be a boolean as an alias to `appsec.enabled`.
773786
*/

packages/dd-trace/src/config/defaults.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ module.exports = {
6262
'codeOriginForSpans.enabled': true,
6363
'codeOriginForSpans.experimental.exit_spans.enabled': false,
6464
dbmPropagationMode: 'disabled',
65+
'dbm.injectSqlBaseHash': false,
6566
'dogstatsd.hostname': '127.0.0.1',
6667
'dogstatsd.port': '8125',
6768
dsmEnabled: false,

packages/dd-trace/src/config/index.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ class Config {
283283
DD_CODE_ORIGIN_FOR_SPANS_EXPERIMENTAL_EXIT_SPANS_ENABLED,
284284
DD_DATA_STREAMS_ENABLED,
285285
DD_DBM_PROPAGATION_MODE,
286+
DD_DBM_INJECT_SQL_BASEHASH,
286287
DD_DOGSTATSD_HOST,
287288
DD_DOGSTATSD_PORT,
288289
DD_DYNAMIC_INSTRUMENTATION_CAPTURE_TIMEOUT_MS,
@@ -574,6 +575,7 @@ class Config {
574575
DD_CODE_ORIGIN_FOR_SPANS_EXPERIMENTAL_EXIT_SPANS_ENABLED
575576
)
576577
setString(target, 'dbmPropagationMode', DD_DBM_PROPAGATION_MODE)
578+
setBoolean(target, 'dbm.injectSqlBaseHash', DD_DBM_INJECT_SQL_BASEHASH)
577579
setString(target, 'dogstatsd.hostname', DD_DOGSTATSD_HOST)
578580
setString(target, 'dogstatsd.port', DD_DOGSTATSD_PORT)
579581
setBoolean(target, 'dsmEnabled', DD_DATA_STREAMS_ENABLED)
@@ -900,6 +902,7 @@ class Config {
900902
options.codeOriginForSpans?.experimental?.exit_spans?.enabled
901903
)
902904
setString(opts, 'dbmPropagationMode', options.dbmPropagationMode)
905+
setBoolean(opts, 'dbm.injectSqlBaseHash', options.dbm?.injectSqlBaseHash)
903906
if (options.dogstatsd) {
904907
setString(opts, 'dogstatsd.hostname', options.dogstatsd.hostname)
905908
setString(opts, 'dogstatsd.port', options.dogstatsd.port)

packages/dd-trace/src/config/supported-configurations.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
"DD_CUSTOM_TRACE_ID": ["A"],
6565
"DD_DATA_STREAMS_ENABLED": ["A"],
6666
"DD_DBM_PROPAGATION_MODE": ["A"],
67+
"DD_DBM_INJECT_SQL_BASEHASH": ["A"],
6768
"DD_DOGSTATSD_HOST": ["A"],
6869
"DD_DOGSTATSD_PORT": ["A"],
6970
"DD_DYNAMIC_INSTRUMENTATION_CAPTURE_TIMEOUT_MS": ["A"],

packages/dd-trace/src/datastreams/pathway.js

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,36 @@ function shaHash (checkpointString) {
2626
* @param {string} env
2727
* @param {string[]} edgeTags
2828
* @param {Buffer} parentHash
29+
* @param {bigint | null} propagationHashBigInt - Optional propagation hash for process/container tags
2930
*/
30-
function computeHash (service, env, edgeTags, parentHash) {
31+
function computeHash (service, env, edgeTags, parentHash, propagationHashBigInt = null) {
3132
edgeTags.sort()
3233
const hashableEdgeTags = edgeTags.filter(item => item !== 'manual_checkpoint:true')
3334

34-
const key = `${service}${env}${hashableEdgeTags.join('')}${parentHash}`
35+
// Cache key includes parentHash to handle fan-in/fan-out scenarios where the same
36+
// service+env+tags+propagationHash can have different parents. This ensures we cache
37+
// the complete pathway context, not just the current node's identity.
38+
const propagationPart = propagationHashBigInt ? `:${propagationHashBigInt.toString(16)}` : ''
39+
const key = `${service}${env}${hashableEdgeTags.join('')}${parentHash}${propagationPart}`
40+
3541
let value = cache.get(key)
3642
if (value) {
3743
return value
3844
}
39-
const currentHash = shaHash(`${service}${env}` + hashableEdgeTags.join(''))
45+
46+
// Key vs hashInput distinction:
47+
// - 'key' (above) is used for caching and includes parentHash to differentiate pathways
48+
// with the same node but different parents (e.g., multiple queues feeding one consumer)
49+
// - 'hashInput' (below) excludes parentHash to compute only the current node's identity hash,
50+
// which is then XORed with parentHash (line 54) to build the complete pathway hash
51+
// This two-step approach (hash current node independently, then combine with parent) is
52+
// required for proper pathway construction in the DSM protocol.
53+
const baseString = `${service}${env}` + hashableEdgeTags.join('')
54+
const hashInput = propagationHashBigInt
55+
? `${baseString}:${propagationHashBigInt.toString(16)}`
56+
: baseString
57+
58+
const currentHash = shaHash(hashInput)
4059
const buf = Buffer.concat([currentHash, parentHash], 16)
4160
value = shaHash(buf.toString())
4261
cache.set(key, value)

packages/dd-trace/src/datastreams/processor.js

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ const pkg = require('../../../../package.json')
66
const { LogCollapsingLowestDenseDDSketch } = require('../../../../vendor/dist/@datadog/sketches-js')
77
const { PATHWAY_HASH } = require('../../../../ext/tags')
88
const log = require('../log')
9+
const processTags = require('../process-tags')
10+
const propagationHash = require('../propagation-hash')
911
const { DsmPathwayCodec } = require('./pathway')
1012
const { DataStreamsWriter } = require('./writer')
1113
const { computePathwayHash } = require('./pathway')
@@ -162,6 +164,7 @@ class DataStreamsProcessor {
162164
onInterval () {
163165
const { Stats } = this._serializeBuckets()
164166
if (Stats.length === 0) return
167+
165168
const payload = {
166169
Env: this.env,
167170
Service: this.service,
@@ -171,6 +174,12 @@ class DataStreamsProcessor {
171174
Lang: 'javascript',
172175
Tags: Object.entries(this.tags).map(([key, value]) => `${key}:${value}`),
173176
}
177+
178+
// Add ProcessTags only if feature is enabled and process tags exist
179+
if (propagationHash.isEnabled() && processTags.serialized) {
180+
payload.ProcessTags = processTags.serialized.split(',')
181+
}
182+
174183
this.writer.flush(payload)
175184
}
176185

@@ -234,7 +243,11 @@ class DataStreamsProcessor {
234243
edgeTags
235244
)
236245
}
237-
const hash = computePathwayHash(this.service, this.env, edgeTags, parentHash)
246+
247+
// Get propagation hash if enabled
248+
const propagationHashValue = propagationHash.isEnabled() ? propagationHash.getHash() : null
249+
250+
const hash = computePathwayHash(this.service, this.env, edgeTags, parentHash, propagationHashValue)
238251
const edgeLatencyNs = nowNs - edgeStartNs
239252
const pathwayLatencyNs = nowNs - pathwayStartNs
240253
const dataStreamsContext = {

packages/dd-trace/src/exporters/agent/writer.js

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
'use strict'
22

3-
const { inspect } = require('util')
4-
53
const request = require('../common/request')
64
const { startupLog } = require('../../startup-log')
75
const runtimeMetrics = require('../../runtime_metrics')
86
const log = require('../../log')
97
const tracerVersion = require('../../../../../package.json').version
108
const BaseWriter = require('../common/writer')
9+
const propagationHash = require('../../propagation-hash')
1110

1211
const METRIC_PREFIX = 'datadog.tracer.node.exporter.agent'
1312

@@ -29,10 +28,7 @@ class AgentWriter extends BaseWriter {
2928
runtimeMetrics.increment(`${METRIC_PREFIX}.requests`, true)
3029

3130
const { _headers, _lookup, _protocolVersion, _url } = this
32-
makeRequest(_protocolVersion, data, count, _url, _headers, _lookup, (err, res, status) => {
33-
// Note that logging will only happen once, regardless of how many times this is called.
34-
startupLog(status !== 404 && status !== 200 ? { status, message: err?.message ?? inspect(err) } : undefined)
35-
31+
makeRequest(_protocolVersion, data, count, _url, _headers, _lookup, true, (err, res, status, headers) => {
3632
if (status) {
3733
runtimeMetrics.increment(`${METRIC_PREFIX}.responses`, true)
3834
runtimeMetrics.increment(`${METRIC_PREFIX}.responses.by.status`, `status:${status}`, true)
@@ -53,6 +49,16 @@ class AgentWriter extends BaseWriter {
5349

5450
log.debug('Response from the agent: %s', res)
5551

52+
// Capture container tags hash from agent response headers
53+
// The hash is sent by the agent only when Datadog-Container-ID is present in the request
54+
// (Datadog-Container-ID is automatically injected by docker.inject() in exporters/common/request.js)
55+
if (headers) {
56+
const containerTagsHash = headers['Datadog-Container-Tags-Hash']
57+
if (containerTagsHash) {
58+
propagationHash.updateContainerTagsHash(containerTagsHash)
59+
}
60+
}
61+
5662
try {
5763
this._prioritySampler.update(JSON.parse(res).rate_by_service)
5864
} catch (e) {
@@ -72,7 +78,7 @@ function getEncoder (protocolVersion) {
7278
: require('../../encode/0.4').AgentEncoder
7379
}
7480

75-
function makeRequest (version, data, count, url, headers, lookup, cb) {
81+
function makeRequest (version, data, count, url, headers, lookup, needsStartupLog, cb) {
7682
const options = {
7783
path: `/v${version}/traces`,
7884
method: 'PUT',
@@ -91,7 +97,15 @@ function makeRequest (version, data, count, url, headers, lookup, cb) {
9197

9298
log.debug('Request to the agent: %j', options)
9399

94-
request(data, options, cb)
100+
request(data, options, (err, res, status, headers) => {
101+
if (needsStartupLog) {
102+
// Note that logging will only happen once, regardless of how many times this is called.
103+
startupLog({
104+
agentError: status !== 404 && status !== 200 ? err : undefined,
105+
})
106+
}
107+
cb(err, res, status, headers)
108+
})
95109
}
96110

97111
module.exports = AgentWriter

packages/dd-trace/src/exporters/common/request.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,13 @@ function request (data, options, callback) {
8888
zlib.gunzip(buffer, (err, result) => {
8989
if (err) {
9090
log.error('Could not gunzip response: %s', err.message)
91-
callback(null, '', res.statusCode)
91+
callback(null, '', res.statusCode, res.headers)
9292
} else {
93-
callback(null, result.toString(), res.statusCode)
93+
callback(null, result.toString(), res.statusCode, res.headers)
9494
}
9595
})
9696
} else {
97-
callback(null, buffer.toString(), res.statusCode)
97+
callback(null, buffer.toString(), res.statusCode, res.headers)
9898
}
9999
} else {
100100
let errorMessage = ''
@@ -115,7 +115,7 @@ function request (data, options, callback) {
115115
const error = new log.NoTransmitError(errorMessage)
116116
error.status = res.statusCode
117117

118-
callback(error, null, res.statusCode)
118+
callback(error, null, res.statusCode, res.headers)
119119
}
120120
})
121121
}

packages/dd-trace/src/plugins/database.js

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict'
22

33
const { PEER_SERVICE_KEY, PEER_SERVICE_SOURCE_KEY } = require('../constants')
4+
const propagationHash = require('../propagation-hash')
45
const StoragePlugin = require('./storage')
56

67
class DatabasePlugin extends StoragePlugin {
@@ -59,13 +60,25 @@ class DatabasePlugin extends StoragePlugin {
5960
const dbmService = this.#getDbmServiceName(serviceName, peerData)
6061
const servicePropagation = this.#createDBMPropagationCommentService(dbmService, span, peerData)
6162

63+
let dbmComment = servicePropagation
64+
65+
// Add propagation hash if both process tags and SQL base hash injection are enabled
66+
if (propagationHash.isEnabled() && this.config['dbm.injectSqlBaseHash']) {
67+
const hashBase64 = propagationHash.getHashBase64()
68+
if (hashBase64) {
69+
dbmComment += `,ddsh='${hashBase64}'`
70+
// Add hash to span meta as a tag
71+
span.setTag('_dd.dbm.propagation_hash', hashBase64)
72+
}
73+
}
74+
6275
if (disableFullMode || mode === 'service') {
63-
return servicePropagation
76+
return dbmComment
6477
} else if (mode === 'full') {
6578
span.setTag('_dd.dbm_trace_injected', 'true')
6679
span._processor.sample(span)
6780
const traceparent = span._spanContext.toTraceparent()
68-
return `${servicePropagation},traceparent='${traceparent}'`
81+
return `${dbmComment},traceparent='${traceparent}'`
6982
}
7083
}
7184

0 commit comments

Comments
 (0)