Skip to content

Commit f5bec49

Browse files
authored
[DSM] Fix an issue where RabbitMQ producers when producing a message to the default exchange were setting checkpoints that didn't work in DSM (#5150)
1 parent 4f22cf7 commit f5bec49

2 files changed

Lines changed: 47 additions & 9 deletions

File tree

packages/datadog-plugin-amqplib/src/producer.js

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,17 @@ class AmqplibProducerPlugin extends ProducerPlugin {
3636
if (this.config.dsmEnabled) {
3737
const hasRoutingKey = fields.routingKey != null
3838
const payloadSize = getAmqpMessageSize({ content: message, headers: fields.headers })
39+
40+
// there are two ways to send messages in RabbitMQ:
41+
// 1. using an exchange and a routing key in which DSM connects via the exchange
42+
// 2. using an unnamed exchange and a routing key in which DSM connects via the topic
43+
const exchangeOrTopicTag = hasRoutingKey && !fields.exchange
44+
? `topic:${fields.routingKey}`
45+
: `exchange:${fields.exchange}`
46+
3947
const dataStreamsContext = this.tracer
4048
.setCheckpoint(
41-
['direction:out', `exchange:${fields.exchange}`, `has_routing_key:${hasRoutingKey}`, 'type:rabbitmq']
49+
['direction:out', exchangeOrTopicTag, `has_routing_key:${hasRoutingKey}`, 'type:rabbitmq']
4250
, span, payloadSize)
4351
DsmPathwayCodec.encode(dataStreamsContext, fields.headers)
4452
}

packages/datadog-plugin-amqplib/test/index.spec.js

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -306,8 +306,10 @@ describe('Plugin', () => {
306306
describe('when data streams monitoring is enabled', function () {
307307
this.timeout(10000)
308308

309-
const expectedProducerHash = '17191234428405871432'
310-
const expectedConsumerHash = '18277095184718602853'
309+
const expectedProducerHashWithTopic = '16804605750389532869'
310+
const expectedProducerHashWithExchange = '2722596631431228032'
311+
312+
const expectedConsumerHash = '17529824252700998941'
311313

312314
before(() => {
313315
tracer = require('../../dd-trace')
@@ -322,7 +324,7 @@ describe('Plugin', () => {
322324
return agent.close({ ritmReset: false })
323325
})
324326

325-
it('Should emit DSM stats to the agent when sending a message', done => {
327+
it('Should emit DSM stats to the agent when sending a message on an unnamed exchange', done => {
326328
agent.expectPipelineStats(dsmStats => {
327329
let statsPointsReceived = []
328330
// we should have 1 dsm stats points
@@ -336,11 +338,11 @@ describe('Plugin', () => {
336338
expect(statsPointsReceived.length).to.be.at.least(1)
337339
expect(statsPointsReceived[0].EdgeTags).to.deep.equal([
338340
'direction:out',
339-
'exchange:',
340341
'has_routing_key:true',
342+
'topic:testDSM',
341343
'type:rabbitmq'
342344
])
343-
expect(agent.dsmStatsExist(agent, expectedProducerHash)).to.equal(true)
345+
expect(agent.dsmStatsExist(agent, expectedProducerHashWithTopic)).to.equal(true)
344346
}, { timeoutMs: 10000 }).then(done, done)
345347

346348
channel.assertQueue('testDSM', {}, (err, ok) => {
@@ -350,6 +352,34 @@ describe('Plugin', () => {
350352
})
351353
})
352354

355+
it('Should emit DSM stats to the agent when sending a message on an named exchange', done => {
356+
agent.expectPipelineStats(dsmStats => {
357+
let statsPointsReceived = []
358+
// we should have 1 dsm stats points
359+
dsmStats.forEach((timeStatsBucket) => {
360+
if (timeStatsBucket && timeStatsBucket.Stats) {
361+
timeStatsBucket.Stats.forEach((statsBuckets) => {
362+
statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats)
363+
})
364+
}
365+
})
366+
expect(statsPointsReceived.length).to.be.at.least(1)
367+
expect(statsPointsReceived[0].EdgeTags).to.deep.equal([
368+
'direction:out',
369+
'exchange:namedExchange',
370+
'has_routing_key:true',
371+
'type:rabbitmq'
372+
])
373+
expect(agent.dsmStatsExist(agent, expectedProducerHashWithExchange)).to.equal(true)
374+
}, { timeoutMs: 10000 }).then(done, done)
375+
376+
channel.assertExchange('namedExchange', 'direct', {}, (err, ok) => {
377+
if (err) return done(err)
378+
379+
channel.publish('namedExchange', 'anyOldRoutingKey', Buffer.from('DSM pathway test'))
380+
})
381+
})
382+
353383
it('Should emit DSM stats to the agent when receiving a message', done => {
354384
agent.expectPipelineStats(dsmStats => {
355385
let statsPointsReceived = []
@@ -390,11 +420,11 @@ describe('Plugin', () => {
390420
expect(statsPointsReceived.length).to.be.at.least(1)
391421
expect(statsPointsReceived[0].EdgeTags).to.deep.equal([
392422
'direction:out',
393-
'exchange:',
394423
'has_routing_key:true',
424+
'topic:testDSM',
395425
'type:rabbitmq'
396426
])
397-
expect(agent.dsmStatsExist(agent, expectedProducerHash)).to.equal(true)
427+
expect(agent.dsmStatsExist(agent, expectedProducerHashWithTopic)).to.equal(true)
398428
}, { timeoutMs: 10000 }).then(done, done)
399429

400430
channel.assertQueue('testDSM', {}, (err, ok) => {
@@ -445,7 +475,7 @@ describe('Plugin', () => {
445475
}
446476

447477
expect(produceSpanMeta).to.include({
448-
'pathway.hash': expectedProducerHash
478+
'pathway.hash': expectedProducerHashWithTopic
449479
})
450480
}, { timeoutMs: 10000 }).then(done, done)
451481
})

0 commit comments

Comments
 (0)