Skip to content

Commit 9ed88dc

Browse files
authored
tracing(kafka): add batched tracing for kafka consumes (kafkajs | confluent-kafka) (#7479)
* add batched tracing for kafka consumes * fix batch messages to prevent mutations between test cases
1 parent 4cc7d88 commit 9ed88dc

3 files changed

Lines changed: 227 additions & 4 deletions

File tree

packages/datadog-plugin-confluentinc-kafka-javascript/test/index.spec.js

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,85 @@ describe('Plugin', () => {
252252
return expectedSpanPromise
253253
})
254254
})
255+
256+
describe('consumer (eachBatch)', () => {
257+
let consumer
258+
let batchMessages
259+
260+
beforeEach(async () => {
261+
batchMessages = [{ key: 'key1', value: 'test2' }, { key: 'key2', value: 'test3' }]
262+
consumer = kafka.consumer({
263+
kafkaJS: { groupId, fromBeginning: true, autoCommit: false },
264+
})
265+
await consumer.connect()
266+
await consumer.subscribe({ topic: testTopic })
267+
})
268+
269+
afterEach(async () => {
270+
await consumer.disconnect()
271+
})
272+
273+
it('should be instrumented', async () => {
274+
const expectedSpanPromise = expectSpanWithDefaults({
275+
name: expectedSchema.receive.opName,
276+
service: expectedSchema.receive.serviceName,
277+
meta: {
278+
'span.kind': 'consumer',
279+
component: 'confluentinc-kafka-javascript',
280+
'kafka.topic': testTopic,
281+
'messaging.destination.name': testTopic,
282+
'messaging.system': 'kafka',
283+
},
284+
resource: testTopic,
285+
error: 0,
286+
type: 'worker',
287+
})
288+
289+
await consumer.run({ eachBatch: () => {} })
290+
return Promise.all([sendMessages(kafka, testTopic, batchMessages), expectedSpanPromise])
291+
})
292+
293+
it('should run the consumer in the context of the consumer span', done => {
294+
const firstSpan = tracer.scope().active()
295+
let eachBatch = async ({ batch }) => {
296+
const currentSpan = tracer.scope().active()
297+
298+
try {
299+
assert.notEqual(currentSpan, firstSpan)
300+
assert.strictEqual(currentSpan.context()._name, expectedSchema.receive.opName)
301+
done()
302+
} catch (e) {
303+
done(e)
304+
} finally {
305+
eachBatch = () => {} // avoid being called for each message
306+
}
307+
}
308+
309+
consumer.run({ eachBatch: (...args) => eachBatch(...args) })
310+
.then(() => sendMessages(kafka, testTopic, batchMessages))
311+
.catch(done)
312+
})
313+
314+
it('should propagate context via span links', async () => {
315+
const expectedSpanPromise = agent.assertSomeTraces(traces => {
316+
const span = traces[0][0]
317+
const links = span.meta['_dd.span_links'] ? JSON.parse(span.meta['_dd.span_links']) : []
318+
319+
assertObjectContains(span, {
320+
name: expectedSchema.receive.opName,
321+
service: expectedSchema.receive.serviceName,
322+
resource: testTopic,
323+
})
324+
325+
// librdkafka may deliver messages across multiple batches,
326+
// so each batch span will have links for the messages it received.
327+
assert.ok(links.length >= 1, `expected at least 1 span link, got ${links.length}`)
328+
})
329+
330+
await consumer.run({ eachBatch: () => {} })
331+
await Promise.all([sendMessages(kafka, testTopic, batchMessages), expectedSpanPromise])
332+
})
333+
})
255334
})
256335

257336
// Adding tests for the native API

packages/datadog-plugin-kafkajs/src/batch-consumer.js

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,47 @@ class KafkajsBatchConsumerPlugin extends ConsumerPlugin {
88
static id = 'kafkajs'
99
static operation = 'consume-batch'
1010

11-
start (ctx) {
12-
const { topic, messages, groupId, clusterId } = ctx.extractedArgs || ctx
11+
bindStart (ctx) {
12+
const { topic, partition, messages, groupId, clusterId } = ctx.extractedArgs || ctx
13+
14+
const span = this.startSpan({
15+
resource: topic,
16+
type: 'worker',
17+
meta: {
18+
component: this.constructor.id,
19+
'kafka.topic': topic,
20+
'kafka.cluster_id': clusterId,
21+
'messaging.destination.name': topic,
22+
'messaging.system': 'kafka',
23+
},
24+
metrics: {
25+
'kafka.partition': partition,
26+
'messaging.batch.message_count': messages.length,
27+
},
28+
}, ctx)
1329

14-
if (!this.config.dsmEnabled) return
1530
for (const message of messages) {
1631
if (!message || !message.headers) continue
32+
33+
const headers = convertToTextMap(message.headers)
34+
if (headers) {
35+
const childOf = this.tracer.extract('text_map', headers)
36+
if (childOf) {
37+
span.addLink(childOf)
38+
}
39+
}
40+
41+
if (!this.config.dsmEnabled) continue
1742
const payloadSize = getMessageSize(message)
18-
this.tracer.decodeDataStreamsContext(convertToTextMap(message.headers))
43+
this.tracer.decodeDataStreamsContext(headers)
1944
const edgeTags = ['direction:in', `group:${groupId}`, `topic:${topic}`, 'type:kafka']
2045
if (clusterId) {
2146
edgeTags.push(`kafka_cluster_id:${clusterId}`)
2247
}
2348
this.tracer.setCheckpoint(edgeTags, null, payloadSize)
2449
}
50+
51+
return ctx.currentStore
2552
}
2653
}
2754

packages/datadog-plugin-kafkajs/test/index.spec.js

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,123 @@ describe('Plugin', () => {
404404
rawExpectedSchema.receive
405405
)
406406
})
407+
408+
describe('consumer (eachBatch)', () => {
409+
let consumer
410+
const batchMessages = [{ key: 'key1', value: 'test2' }, { key: 'key2', value: 'test3' }]
411+
412+
beforeEach(async () => {
413+
consumer = kafka.consumer({ groupId: 'test-group' })
414+
await consumer.connect()
415+
await consumer.subscribe({ topic: testTopic, fromBeginning: true })
416+
})
417+
418+
afterEach(async () => {
419+
await consumer.disconnect()
420+
})
421+
422+
it('should be instrumented', async () => {
423+
const meta = {
424+
'span.kind': 'consumer',
425+
component: 'kafkajs',
426+
'kafka.topic': testTopic,
427+
'messaging.destination.name': testTopic,
428+
'messaging.system': 'kafka',
429+
}
430+
if (clusterIdAvailable) meta['kafka.cluster_id'] = testKafkaClusterId
431+
432+
const expectedSpanPromise = expectSpanWithDefaults({
433+
name: expectedSchema.receive.opName,
434+
service: expectedSchema.receive.serviceName,
435+
meta,
436+
metrics: {
437+
'messaging.batch.message_count': batchMessages.length,
438+
},
439+
resource: testTopic,
440+
error: 0,
441+
type: 'worker',
442+
})
443+
444+
await consumer.run({
445+
eachBatch: () => {},
446+
})
447+
return Promise.all([sendMessages(kafka, testTopic, batchMessages), expectedSpanPromise])
448+
})
449+
450+
it('should run the consumer in the context of the consumer span', done => {
451+
const firstSpan = tracer.scope().active()
452+
453+
let eachBatch = async ({ batch }) => {
454+
const currentSpan = tracer.scope().active()
455+
456+
try {
457+
assert.notEqual(currentSpan, firstSpan)
458+
assert.strictEqual(currentSpan.context()._name, expectedSchema.receive.opName)
459+
done()
460+
} catch (e) {
461+
done(e)
462+
} finally {
463+
eachBatch = () => {} // avoid being called for each message
464+
}
465+
}
466+
467+
consumer.run({ eachBatch: (...args) => eachBatch(...args) })
468+
.then(() => sendMessages(kafka, testTopic, batchMessages))
469+
.catch(done)
470+
})
471+
472+
it('should propagate context via span links', async () => {
473+
const expectedSpanPromise = agent.assertSomeTraces(traces => {
474+
const span = traces[0][0]
475+
const links = span.meta['_dd.span_links'] ? JSON.parse(span.meta['_dd.span_links']) : []
476+
477+
assertObjectContains(span, {
478+
name: expectedSchema.receive.opName,
479+
service: expectedSchema.receive.serviceName,
480+
resource: testTopic,
481+
})
482+
483+
assert.strictEqual(links.length, batchMessages.length)
484+
})
485+
486+
await consumer.run({ eachBatch: () => {} })
487+
await Promise.all([sendMessages(kafka, testTopic, batchMessages), expectedSpanPromise])
488+
})
489+
490+
it('should not fail when messages have headers without trace context', async () => {
491+
const messagesWithHeaders = [
492+
{ key: 'key1', value: 'test1', headers: { 'x-custom-header': 'value' } },
493+
]
494+
const meta = {
495+
'span.kind': 'consumer',
496+
component: 'kafkajs',
497+
'kafka.topic': testTopic,
498+
'messaging.destination.name': testTopic,
499+
'messaging.system': 'kafka',
500+
}
501+
if (clusterIdAvailable) meta['kafka.cluster_id'] = testKafkaClusterId
502+
503+
const expectedSpanPromise = expectSpanWithDefaults({
504+
name: expectedSchema.receive.opName,
505+
service: expectedSchema.receive.serviceName,
506+
meta,
507+
resource: testTopic,
508+
error: 0,
509+
type: 'worker',
510+
})
511+
512+
await consumer.run({ eachBatch: () => {} })
513+
return Promise.all([sendMessages(kafka, testTopic, messagesWithHeaders), expectedSpanPromise])
514+
})
515+
516+
withNamingSchema(
517+
async () => {
518+
await consumer.run({ eachBatch: () => {} })
519+
await sendMessages(kafka, testTopic, batchMessages)
520+
},
521+
rawExpectedSchema.receive
522+
)
523+
})
407524
})
408525
})
409526
})

0 commit comments

Comments
 (0)