Skip to content

Commit 22211b0

Browse files
authored
fix: use sync call and use proper span context for Azure's EventHubs (#6943)
An incorrect usage of tracePromise on a synchronous function is fixed as well using the proper context for spans. Formerly Azure's EventHubs batch context was used for that.
1 parent 9f56752 commit 22211b0

9 files changed

Lines changed: 217 additions & 16 deletions

File tree

integration-tests/helpers/index.js

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -571,14 +571,26 @@ function checkSpansForServiceName (spans, name) {
571571
* @param {(data: Buffer) => void} [stdioHandler]
572572
* @param {Record<string, string|undefined>} [additionalEnvArgs]
573573
*/
574-
async function spawnPluginIntegrationTestProc (cwd, serverFile, agentPort, stdioHandler, additionalEnvArgs) {
574+
async function spawnPluginIntegrationTestProc (
575+
cwd, serverFile, agentPort, stdioHandler, additionalEnvArgs) {
575576
if (typeof stdioHandler !== 'function' && !additionalEnvArgs) {
576577
additionalEnvArgs = stdioHandler
577578
stdioHandler = undefined
578579
}
579-
additionalEnvArgs = additionalEnvArgs || {}
580+
additionalEnvArgs = { ...additionalEnvArgs }
581+
582+
let NODE_OPTIONS = `--loader=${hookFile}`
583+
if (additionalEnvArgs.NODE_OPTIONS !== undefined) {
584+
if (/--(loader|import)/.test(additionalEnvArgs.NODE_OPTIONS ?? '')) {
585+
NODE_OPTIONS = additionalEnvArgs.NODE_OPTIONS
586+
} else {
587+
NODE_OPTIONS += ` ${additionalEnvArgs.NODE_OPTIONS}`
588+
}
589+
delete additionalEnvArgs.NODE_OPTIONS
590+
}
591+
580592
let env = /** @type {Record<string, string|undefined>} */ ({
581-
NODE_OPTIONS: `--loader=${hookFile}`,
593+
NODE_OPTIONS,
582594
DD_TRACE_AGENT_PORT: String(agentPort),
583595
DD_TRACE_FLUSH_INTERVAL: '0'
584596
})

packages/datadog-instrumentations/src/azure-event-hubs.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@ addHook({
1313
versions: ['>=6.0.0']
1414
}, obj => {
1515
const EventHubProducerClient = obj.EventHubProducerClient
16+
1617
shimmer.wrap(EventHubProducerClient.prototype, 'createBatch',
1718
createBatch => async function () {
1819
const batch = await createBatch.apply(this, arguments)
1920
shimmer.wrap(batch, 'tryAdd',
2021
tryAdd => function (eventData) {
2122
const config = this._context.config
2223
const functionName = tryAdd.name
23-
return producerCh.tracePromise(
24+
return producerCh.traceSync(
2425
tryAdd,
2526
{ functionName, eventData, batch: this, config },
2627
this, ...arguments)

packages/datadog-plugin-azure-event-hubs/src/producer.js

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
const { getEnvironmentVariable } = require('../../dd-trace/src/config-helper')
44
const ProducerPlugin = require('../../dd-trace/src/plugins/producer')
55

6+
const spanContexts = new WeakMap()
7+
68
class AzureEventHubsProducerPlugin extends ProducerPlugin {
79
static get id () { return 'azure-event-hubs' }
810
static get operation () { return 'send' }
@@ -36,7 +38,12 @@ class AzureEventHubsProducerPlugin extends ProducerPlugin {
3638
}
3739

3840
if (batchLinksAreEnabled()) {
39-
ctx.batch._spanContexts.push(span.context())
41+
const spanContext = spanContexts.get(ctx.batch)
42+
if (spanContext) {
43+
spanContext.push(span.context())
44+
} else {
45+
spanContexts.set(ctx.batch, [span.context()])
46+
}
4047
injectTraceContext(this.tracer, span, ctx.eventData)
4148
}
4249
}
@@ -53,17 +60,24 @@ class AzureEventHubsProducerPlugin extends ProducerPlugin {
5360
})
5461
} else {
5562
if (batchLinksAreEnabled()) {
56-
eventData._spanContexts.forEach(spanContext => {
57-
span.addLink(spanContext)
58-
})
63+
const contexts = spanContexts.get(eventData)
64+
if (contexts) {
65+
for (const spanContext of contexts) {
66+
span.addLink(spanContext)
67+
}
68+
}
5969
}
6070
}
6171
}
6272
return ctx.currentStore
6373
}
6474

6575
asyncEnd (ctx) {
66-
super.finish()
76+
super.finish(ctx)
77+
}
78+
79+
end (ctx) {
80+
super.finish(ctx)
6781
}
6882
}
6983

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
'use strict'
2+
3+
const {
4+
FakeAgent,
5+
sandboxCwd,
6+
useSandbox,
7+
spawnPluginIntegrationTestProc
8+
} = require('../../../../../integration-tests/helpers')
9+
const { withVersions } = require('../../../../dd-trace/test/setup/mocha')
10+
const assert = require('assert')
11+
12+
describe('esm', () => {
13+
let agent
14+
let proc
15+
let spawnEnv
16+
17+
withVersions('azure-event-hubs', '@azure/event-hubs', version => {
18+
useSandbox([`'@azure/event-hubs@${version}'`], false, [
19+
'./packages/datadog-plugin-azure-event-hubs/test/integration-test/batchSpanContextRegressionTest/*'])
20+
21+
beforeEach(async () => {
22+
agent = await new FakeAgent().start()
23+
process.env.DD_TRACE_DISABLED_PLUGINS = 'amqplib,amqp10,rhea,net'
24+
spawnEnv = { DD_TRACE_FLUSH_INTERVAL: '2000', NODE_OPTIONS: '--experimental-global-webcrypto' }
25+
})
26+
27+
afterEach(async () => {
28+
proc && proc.kill()
29+
await agent.stop()
30+
})
31+
32+
it('tryAdd does not set context in the Azure eventDataBatch._spanContext', async () => {
33+
const res = agent.assertMessageReceived(({ headers, payload }) => {
34+
assert.ok(Array.isArray(payload))
35+
assert.strictEqual(payload.length, 3)
36+
// Verify we got the expected spans from the test
37+
assert.strictEqual(payload[0][0].name, 'azure.eventhubs.create')
38+
assert.strictEqual(payload[1][0].name, 'azure.eventhubs.create')
39+
assert.strictEqual(payload[2][0].name, 'azure.eventhubs.send')
40+
})
41+
42+
// This test file will throw an error if tryAdd returns a Promise instead of a boolean
43+
proc = await spawnPluginIntegrationTestProc(sandboxCwd(), 'server.mjs', agent.port, spawnEnv)
44+
await res
45+
}).timeout(60000)
46+
})
47+
})
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import 'dd-trace/init.js'
2+
import { EventHubProducerClient } from '@azure/event-hubs'
3+
4+
const connectionString = 'Endpoint=sb://127.0.0.1:5673;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;'
5+
const eventHubName = 'eh1'
6+
7+
const producer = new EventHubProducerClient(connectionString, eventHubName)
8+
9+
const events = [
10+
{ body: 'Test event 1' },
11+
{ body: 'Test event 2' }
12+
]
13+
14+
// Test that tryAdd returns a boolean, not a Promise
15+
const batch = await producer.createBatch()
16+
17+
batch.tryAdd(events[0])
18+
batch.tryAdd(events[1])
19+
20+
if (batch._spanContexts.length !== 0) {
21+
throw new Error(
22+
"We should not be using Azure's eventDataBatchspan context. Please use the weak map instead."
23+
)
24+
}
25+
26+
// Send the batch to complete the operation
27+
await producer.sendBatch(batch)
28+
29+
await producer.close()

packages/datadog-plugin-azure-event-hubs/test/integration-test/client.spec.js renamed to packages/datadog-plugin-azure-event-hubs/test/integration-test/core-test/client.spec.js

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,22 @@ const {
1010
assertObjectContains,
1111
checkSpansForServiceName,
1212
spawnPluginIntegrationTestProc
13-
} = require('../../../../integration-tests/helpers')
14-
const { withVersions } = require('../../../dd-trace/test/setup/mocha')
13+
} = require('../../../../../integration-tests/helpers')
14+
const { withVersions } = require('../../../../dd-trace/test/setup/mocha')
1515

16-
const spawnEnv = { DD_TRACE_FLUSH_INTERVAL: '2000' }
17-
18-
// TODO: Fix this test / esm issue
19-
describe.skip('esm', () => {
16+
describe('esm', () => {
2017
let agent
2118
let proc
19+
let spawnEnv
2220

2321
withVersions('azure-event-hubs', '@azure/event-hubs', version => {
2422
useSandbox([`'@azure/event-hubs@${version}'`], false, [
25-
'./packages/datadog-plugin-azure-event-hubs/test/integration-test/*'])
23+
'./packages/datadog-plugin-azure-event-hubs/test/integration-test/core-test/*'])
2624

2725
beforeEach(async () => {
2826
agent = await new FakeAgent().start()
2927
process.env.DD_TRACE_DISABLED_PLUGINS = 'amqplib,amqp10,rhea,net'
28+
spawnEnv = { DD_TRACE_FLUSH_INTERVAL: '2000', NODE_OPTIONS: '--experimental-global-webcrypto' }
3029
})
3130

3231
afterEach(async () => {

packages/datadog-plugin-azure-event-hubs/test/integration-test/server.mjs renamed to packages/datadog-plugin-azure-event-hubs/test/integration-test/core-test/server.mjs

File renamed without changes.
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
'use strict'
2+
3+
const {
4+
FakeAgent,
5+
sandboxCwd,
6+
useSandbox,
7+
spawnPluginIntegrationTestProc
8+
} = require('../../../../../integration-tests/helpers')
9+
const { withVersions } = require('../../../../dd-trace/test/setup/mocha')
10+
const assert = require('assert')
11+
12+
describe('esm', () => {
13+
let agent
14+
let proc
15+
let spawnEnv
16+
17+
withVersions('azure-event-hubs', '@azure/event-hubs', version => {
18+
useSandbox([`'@azure/event-hubs@${version}'`], false, [
19+
'./packages/datadog-plugin-azure-event-hubs/test/integration-test/tryAddRegressionTest/*'])
20+
21+
beforeEach(async () => {
22+
agent = await new FakeAgent().start()
23+
process.env.DD_TRACE_DISABLED_PLUGINS = 'amqplib,amqp10,rhea,net'
24+
spawnEnv = { DD_TRACE_FLUSH_INTERVAL: '2000', NODE_OPTIONS: '--experimental-global-webcrypto' }
25+
})
26+
27+
afterEach(async () => {
28+
proc && proc.kill()
29+
await agent.stop()
30+
})
31+
32+
it('tryAdd returns a boolean, not a Promise', async () => {
33+
const res = agent.assertMessageReceived(({ headers, payload }) => {
34+
assert.ok(Array.isArray(payload))
35+
assert.strictEqual(payload.length, 3)
36+
// Verify we got the expected spans from the test
37+
assert.strictEqual(payload[0][0].name, 'azure.eventhubs.create')
38+
assert.strictEqual(payload[1][0].name, 'azure.eventhubs.create')
39+
assert.strictEqual(payload[2][0].name, 'azure.eventhubs.send')
40+
})
41+
42+
// This test file will throw an error if tryAdd returns a Promise instead of a boolean
43+
proc = await spawnPluginIntegrationTestProc(
44+
sandboxCwd(), 'server.mjs', agent.port, undefined, spawnEnv
45+
)
46+
47+
await res
48+
}).timeout(60000)
49+
})
50+
})
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import 'dd-trace/init.js'
2+
import { EventHubProducerClient } from '@azure/event-hubs'
3+
4+
const connectionString = 'Endpoint=sb://127.0.0.1:5673;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;'
5+
const eventHubName = 'eh1'
6+
7+
const producer = new EventHubProducerClient(connectionString, eventHubName)
8+
9+
const events = [
10+
{ body: 'Test event 1' },
11+
{ body: 'Test event 2' }
12+
]
13+
14+
// Test that tryAdd returns a boolean, not a Promise
15+
const batch = await producer.createBatch()
16+
17+
const result1 = batch.tryAdd(events[0])
18+
const result2 = batch.tryAdd(events[1])
19+
20+
// Verify the return types - throw error if not correct
21+
if (typeof result1 !== 'boolean') {
22+
throw new Error(`tryAdd should return a boolean, but returned ${typeof result1}`)
23+
}
24+
25+
if (result1 instanceof Promise) {
26+
throw new Error('tryAdd should not return a Promise')
27+
}
28+
29+
if (typeof result2 !== 'boolean') {
30+
throw new Error(`tryAdd should return a boolean, but returned ${typeof result2}`)
31+
}
32+
33+
if (result2 instanceof Promise) {
34+
throw new Error('tryAdd should not return a Promise')
35+
}
36+
37+
// Verify the values are correct
38+
if (result1 !== true) {
39+
throw new Error(`Expected first tryAdd to return true, got ${result1}`)
40+
}
41+
42+
if (result2 !== true) {
43+
throw new Error(`Expected second tryAdd to return true, got ${result2}`)
44+
}
45+
46+
// Send the batch to complete the operation
47+
await producer.sendBatch(batch)
48+
49+
await producer.close()

0 commit comments

Comments
 (0)