Skip to content

Commit 261e8ec

Browse files
authored
feat(ws): add websocket context propagation (#7077)
1 parent 244b082 commit 261e8ec

7 files changed

Lines changed: 407 additions & 3 deletions

File tree

packages/datadog-plugin-ws/src/close.js

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
11
'use strict'
22

33
const TracingPlugin = require('../../dd-trace/src/plugins/tracing.js')
4+
const {
5+
incrementWebSocketCounter,
6+
buildWebSocketSpanPointerHash,
7+
hasDistributedTracingContext
8+
} = require('./util')
9+
const {
10+
WEBSOCKET_PTR_KIND,
11+
SPAN_POINTER_DIRECTION,
12+
SPAN_POINTER_DIRECTION_NAME
13+
} = require('../../dd-trace/src/constants')
414

515
class WSClosePlugin extends TracingPlugin {
616
static get id () { return 'ws' }
@@ -60,7 +70,52 @@ class WSClosePlugin extends TracingPlugin {
6070
end (ctx) {
6171
if (!Object.hasOwn(ctx, 'result') || !ctx.span) return
6272

63-
if (ctx.socket.spanContext) ctx.span.addLink({ context: ctx.socket.spanContext })
73+
if (ctx.socket.spanContext) {
74+
const linkAttributes = {}
75+
76+
// Determine link kind based on whether this is peer close (incoming) or self close (outgoing)
77+
const isIncoming = ctx.isPeerClose
78+
linkAttributes['dd.kind'] = isIncoming ? 'executed_by' : 'resuming'
79+
80+
// Add span pointer for context propagation
81+
if (this.config.traceWebsocketMessagesEnabled && ctx.socket.handshakeSpan) {
82+
const handshakeSpan = ctx.socket.handshakeSpan
83+
84+
// Only add span pointers if distributed tracing is enabled and handshake has distributed context
85+
if (hasDistributedTracingContext(handshakeSpan, ctx.socket)) {
86+
const counterType = isIncoming ? 'receiveCounter' : 'sendCounter'
87+
const counter = incrementWebSocketCounter(ctx.socket, counterType)
88+
const handshakeContext = handshakeSpan.context()
89+
90+
const ptrHash = buildWebSocketSpanPointerHash(
91+
handshakeContext._traceId,
92+
handshakeContext._spanId,
93+
counter,
94+
true, // isServer
95+
isIncoming
96+
)
97+
98+
const directionName = isIncoming
99+
? SPAN_POINTER_DIRECTION_NAME.UPSTREAM
100+
: SPAN_POINTER_DIRECTION_NAME.DOWNSTREAM
101+
const direction = isIncoming
102+
? SPAN_POINTER_DIRECTION.UPSTREAM
103+
: SPAN_POINTER_DIRECTION.DOWNSTREAM
104+
105+
// Add span pointer attributes to link
106+
linkAttributes['link.name'] = directionName
107+
linkAttributes['dd.kind'] = 'span-pointer'
108+
linkAttributes['ptr.kind'] = WEBSOCKET_PTR_KIND
109+
linkAttributes['ptr.dir'] = direction
110+
linkAttributes['ptr.hash'] = ptrHash
111+
}
112+
}
113+
114+
ctx.span.addLink({
115+
context: ctx.socket.spanContext,
116+
attributes: linkAttributes
117+
})
118+
}
64119

65120
ctx.span.finish()
66121
}

packages/datadog-plugin-ws/src/producer.js

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
11
'use strict'
22

33
const TracingPlugin = require('../../dd-trace/src/plugins/tracing.js')
4+
const {
5+
incrementWebSocketCounter,
6+
buildWebSocketSpanPointerHash,
7+
hasDistributedTracingContext
8+
} = require('./util')
9+
const {
10+
WEBSOCKET_PTR_KIND,
11+
SPAN_POINTER_DIRECTION,
12+
SPAN_POINTER_DIRECTION_NAME
13+
} = require('../../dd-trace/src/constants')
414

515
class WSProducerPlugin extends TracingPlugin {
616
static get id () { return 'ws' }
@@ -51,9 +61,37 @@ class WSProducerPlugin extends TracingPlugin {
5161
if (!Object.hasOwn(ctx, 'result') || !ctx.span) return
5262

5363
if (ctx.socket.spanContext) {
64+
const linkAttributes = { 'dd.kind': 'resuming' }
65+
66+
// Add span pointer for context propagation
67+
if (this.config.traceWebsocketMessagesEnabled && ctx.socket.handshakeSpan) {
68+
const handshakeSpan = ctx.socket.handshakeSpan
69+
70+
// Only add span pointers if distributed tracing is enabled and handshake has distributed context
71+
if (hasDistributedTracingContext(handshakeSpan, ctx.socket)) {
72+
const counter = incrementWebSocketCounter(ctx.socket, 'sendCounter')
73+
const handshakeContext = handshakeSpan.context()
74+
75+
const ptrHash = buildWebSocketSpanPointerHash(
76+
handshakeContext._traceId,
77+
handshakeContext._spanId,
78+
counter,
79+
true, // isServer
80+
false // isIncoming (this is outgoing)
81+
)
82+
83+
// Add span pointer attributes to link
84+
linkAttributes['link.name'] = SPAN_POINTER_DIRECTION_NAME.DOWNSTREAM
85+
linkAttributes['dd.kind'] = 'span-pointer'
86+
linkAttributes['ptr.kind'] = WEBSOCKET_PTR_KIND
87+
linkAttributes['ptr.dir'] = SPAN_POINTER_DIRECTION.DOWNSTREAM
88+
linkAttributes['ptr.hash'] = ptrHash
89+
}
90+
}
91+
5492
ctx.span.addLink({
5593
context: ctx.socket.spanContext,
56-
attributes: { 'dd.kind': 'resuming' },
94+
attributes: linkAttributes,
5795
})
5896
}
5997

packages/datadog-plugin-ws/src/receiver.js

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
11
'use strict'
22

33
const TracingPlugin = require('../../dd-trace/src/plugins/tracing.js')
4+
const {
5+
incrementWebSocketCounter,
6+
buildWebSocketSpanPointerHash,
7+
hasDistributedTracingContext
8+
} = require('./util')
9+
const {
10+
WEBSOCKET_PTR_KIND,
11+
SPAN_POINTER_DIRECTION,
12+
SPAN_POINTER_DIRECTION_NAME
13+
} = require('../../dd-trace/src/constants')
414

515
class WSReceiverPlugin extends TracingPlugin {
616
static get id () { return 'ws' }
@@ -61,9 +71,37 @@ class WSReceiverPlugin extends TracingPlugin {
6171
if (!Object.hasOwn(ctx, 'result') || !ctx.span) return
6272

6373
if (ctx.socket.spanContext) {
74+
const linkAttributes = { 'dd.kind': 'executed_by' }
75+
76+
// Add span pointer for context propagation
77+
if (this.config.traceWebsocketMessagesEnabled && ctx.socket.handshakeSpan) {
78+
const handshakeSpan = ctx.socket.handshakeSpan
79+
80+
// Only add span pointers if distributed tracing is enabled and handshake has distributed context
81+
if (hasDistributedTracingContext(handshakeSpan, ctx.socket)) {
82+
const counter = incrementWebSocketCounter(ctx.socket, 'receiveCounter')
83+
const handshakeContext = handshakeSpan.context()
84+
85+
const ptrHash = buildWebSocketSpanPointerHash(
86+
handshakeContext._traceId,
87+
handshakeContext._spanId,
88+
counter,
89+
true, // isServer
90+
true // isIncoming
91+
)
92+
93+
// Add span pointer attributes to link
94+
linkAttributes['link.name'] = SPAN_POINTER_DIRECTION_NAME.UPSTREAM
95+
linkAttributes['dd.kind'] = 'span-pointer'
96+
linkAttributes['ptr.kind'] = WEBSOCKET_PTR_KIND
97+
linkAttributes['ptr.dir'] = SPAN_POINTER_DIRECTION.UPSTREAM
98+
linkAttributes['ptr.hash'] = ptrHash
99+
}
100+
}
101+
64102
ctx.span.addLink({
65103
context: ctx.socket.spanContext,
66-
attributes: { 'dd.kind': 'executed_by' },
104+
attributes: linkAttributes,
67105
})
68106
}
69107

packages/datadog-plugin-ws/src/server.js

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
const TracingPlugin = require('../../dd-trace/src/plugins/tracing.js')
44
const tags = require('../../../ext/tags.js')
5+
const { initWebSocketMessageCounters } = require('./util')
6+
const { FORMAT_HTTP_HEADERS } = require('../../../ext/formats')
57

68
const HTTP_STATUS_CODE = tags.HTTP_STATUS_CODE
79

@@ -28,9 +30,13 @@ class WSServerPlugin extends TracingPlugin {
2830

2931
ctx.args = { options }
3032

33+
// Extract distributed tracing context from request headers
34+
const childOf = this.tracer.extract(FORMAT_HTTP_HEADERS, req.headers)
35+
3136
const service = this.serviceName({ pluginConfig: this.config })
3237
const span = this.startSpan(this.operationName(), {
3338
service,
39+
childOf,
3440
meta: {
3541
'span.type': 'websocket',
3642
'http.upgraded': 'websocket',
@@ -46,6 +52,13 @@ class WSServerPlugin extends TracingPlugin {
4652

4753
ctx.socket.spanContext = ctx.span._spanContext
4854
ctx.socket.spanContext.spanTags = ctx.span._spanContext._tags
55+
// Store the handshake span for use in message span pointers
56+
ctx.socket.handshakeSpan = ctx.span
57+
// Store the request headers for distributed tracing check
58+
ctx.socket.requestHeaders = req.headers
59+
60+
// Initialize message counters for span pointers
61+
initWebSocketMessageCounters(ctx.socket)
4962

5063
return ctx.currentStore
5164
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
'use strict'
2+
3+
// WeakMap to store message counters per socket without mutating the socket object
4+
const socketCounters = new WeakMap()
5+
6+
/**
7+
* Initializes WebSocket message counters for a socket.
8+
* @param {object} socket - The WebSocket socket object
9+
*/
10+
function initWebSocketMessageCounters (socket) {
11+
if (!socketCounters.has(socket)) {
12+
socketCounters.set(socket, {
13+
receiveCounter: 0,
14+
sendCounter: 0
15+
})
16+
}
17+
}
18+
19+
/**
20+
* Increments and returns the WebSocket message counter.
21+
* @param {object} socket - The WebSocket socket object
22+
* @param {string} counterType - Either 'receiveCounter' or 'sendCounter'
23+
* @returns {number} The incremented counter value
24+
*/
25+
function incrementWebSocketCounter (socket, counterType) {
26+
if (!socketCounters.has(socket)) {
27+
initWebSocketMessageCounters(socket)
28+
}
29+
const counters = socketCounters.get(socket)
30+
counters[counterType]++
31+
return counters[counterType]
32+
}
33+
34+
/**
35+
* Builds a WebSocket span pointer hash.
36+
*
37+
* Format: <prefix><128 bit hex trace id><64 bit hex span id><32 bit hex counter>
38+
* Prefix: 'S' for server outgoing or client incoming, 'C' for server incoming or client outgoing
39+
*
40+
* @param {bigint} handshakeTraceId - The trace ID from the handshake span (as a BigInt)
41+
* @param {bigint} handshakeSpanId - The span ID from the handshake span (as a BigInt)
42+
* @param {number} counter - The message counter
43+
* @param {boolean} isServer - Whether this is a server (true) or client (false)
44+
* @param {boolean} isIncoming - Whether this is an incoming message (true) or outgoing (false)
45+
* @returns {string} The span pointer hash
46+
*/
47+
function buildWebSocketSpanPointerHash (handshakeTraceId, handshakeSpanId, counter, isServer, isIncoming) {
48+
// Determine prefix based on server/client and incoming/outgoing
49+
// Server outgoing or client incoming: 'S'
50+
// Server incoming or client outgoing: 'C'
51+
const prefix = (isServer && !isIncoming) || (!isServer && isIncoming) ? 'S' : 'C'
52+
53+
// Pad trace ID to 32 hex chars (128 bits)
54+
const traceIdHex = handshakeTraceId.toString(16).padStart(32, '0')
55+
56+
// Pad span ID to 16 hex chars (64 bits)
57+
const spanIdHex = handshakeSpanId.toString(16).padStart(16, '0')
58+
59+
// Pad counter to 8 hex chars (32 bits)
60+
const counterHex = counter.toString(16).padStart(8, '0')
61+
62+
return `${prefix}${traceIdHex}${spanIdHex}${counterHex}`
63+
}
64+
65+
/**
66+
* Checks if the handshake span has extracted distributed tracing context.
67+
* A websocket server must not set the span pointer if the handshake has not extracted a context.
68+
*
69+
* A span has distributed tracing context if it has a parent context that was
70+
* extracted from headers (remote parent).
71+
*
72+
* @param {object} span - The handshake span
73+
* @param {object} socket - The WebSocket socket object
74+
* @returns {boolean} True if the span has distributed tracing context
75+
*/
76+
function hasDistributedTracingContext (span, socket) {
77+
if (!span) return false
78+
const context = span.context()
79+
if (!context) return false
80+
81+
// Check if this span has a parent. If the parent was extracted from remote headers,
82+
// then this span is part of a distributed trace.
83+
// We check if the span has a parent by looking at _parentId.
84+
// In the JavaScript tracer, when a context is extracted from headers and a child span
85+
// is created, the child will have _parentId set to the extracted parent's span ID.
86+
//
87+
// For testing purposes, we also check if Datadog trace headers are present in the socket's
88+
// upgrade request, which indicates distributed tracing context was sent by the client.
89+
if (context._parentId !== null) {
90+
return true
91+
}
92+
93+
// Fallback check: look for distributed tracing headers in the stored request headers
94+
if (socket && socket.requestHeaders) {
95+
const headers = socket.requestHeaders
96+
return !!(headers['x-datadog-trace-id'] || headers.traceparent)
97+
}
98+
99+
return false
100+
}
101+
102+
module.exports = {
103+
initWebSocketMessageCounters,
104+
incrementWebSocketCounter,
105+
buildWebSocketSpanPointerHash,
106+
hasDistributedTracingContext
107+
}

0 commit comments

Comments
 (0)