Skip to content

Commit 25b7924

Browse files
BridgeARQard
andauthored
feat(instrumentation): support light-my-request inject (#7155)
Some frameworks like fastify include request injection for communicating between services in the same process. This needs special handling to treat the inject calls like regular http requests. Co-authored-by: Stephen Belanger <admin@stephenbelanger.com>
1 parent b00ab50 commit 25b7924

4 files changed

Lines changed: 373 additions & 0 deletions

File tree

packages/datadog-instrumentations/src/helpers/hooks.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ module.exports = {
8383
kafkajs: () => require('../kafkajs'),
8484
langchain: () => require('../langchain'),
8585
ldapjs: () => require('../ldapjs'),
86+
'light-my-request': () => require('../light-my-request'),
8687
'limitd-client': () => require('../limitd-client'),
8788
lodash: () => require('../lodash'),
8889
mariadb: () => require('../mariadb'),
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
'use strict'
2+
3+
/**
4+
* light-my-request instrumentation
5+
*
6+
* This instrumentation enables dd-trace to capture spans for Fastify inject() calls
7+
* which use light-my-request internally. Without this, inject() bypasses the HTTP
8+
* server instrumentation since it doesn't go through http.Server.emit('request').
9+
*
10+
* This is critical for platforms like Platformatic that use undici-thread-interceptor
11+
* to route requests between worker threads using Fastify inject().
12+
*/
13+
14+
const {
15+
channel,
16+
addHook
17+
} = require('./helpers/instrument')
18+
const shimmer = require('../../datadog-shimmer')
19+
20+
// Reuse the same channels as HTTP server instrumentation
21+
const startServerCh = channel('apm:http:server:request:start')
22+
const exitServerCh = channel('apm:http:server:request:exit')
23+
const errorServerCh = channel('apm:http:server:request:error')
24+
const finishServerCh = channel('apm:http:server:request:finish')
25+
26+
addHook({ name: 'light-my-request', versions: ['>=3'] }, (lightMyRequest) => {
27+
// Wrap the inject function
28+
return shimmer.wrapFunction(lightMyRequest, lightMyRequest => {
29+
return function wrappedInject (dispatchFunc, options, callback) {
30+
// If no subscribers, use original behavior
31+
if (!startServerCh.hasSubscribers) {
32+
return lightMyRequest.apply(this, arguments)
33+
}
34+
35+
// Wrap the dispatch function to add tracing
36+
const wrappedDispatch = wrapDispatchFunc(dispatchFunc)
37+
38+
// Call original with wrapped dispatch
39+
return lightMyRequest.call(this, wrappedDispatch, options, callback)
40+
}
41+
})
42+
})
43+
44+
function wrapDispatchFunc (dispatchFunc) {
45+
return function tracedDispatch (req, res) {
46+
const abortController = new AbortController()
47+
48+
// Link res.req like HTTP instrumentation does
49+
res.req = req
50+
51+
// Publish start event (same as HTTP server)
52+
startServerCh.publish({ req, res, abortController })
53+
54+
// Track when response finishes via 'finish' event (like HTTP instrumentation)
55+
let finishCalled = false
56+
const onFinish = () => {
57+
if (!finishCalled) {
58+
finishCalled = true
59+
finishServerCh.publish({ req })
60+
}
61+
}
62+
63+
// light-my-request Response emits 'finish' when done
64+
if (res.on && typeof res.on === 'function') {
65+
res.on('finish', onFinish)
66+
res.on('close', onFinish)
67+
}
68+
69+
// Also wrap end() as fallback
70+
const originalEnd = res.end
71+
if (originalEnd) {
72+
res.end = function wrappedEnd () {
73+
const result = originalEnd.apply(this, arguments)
74+
// Trigger finish if events don't fire
75+
setImmediate(onFinish)
76+
return result
77+
}
78+
}
79+
80+
try {
81+
if (abortController.signal.aborted) {
82+
return
83+
}
84+
85+
return dispatchFunc.call(this, req, res)
86+
} catch (err) {
87+
errorServerCh.publish(err)
88+
throw err
89+
} finally {
90+
exitServerCh.publish({ req })
91+
}
92+
}
93+
}
Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
'use strict'
2+
3+
const assert = require('node:assert')
4+
const dc = require('dc-polyfill')
5+
const { describe, it, before, after, beforeEach, afterEach } = require('mocha')
6+
const sinon = require('sinon')
7+
8+
const agent = require('../../dd-trace/test/plugins/agent')
9+
10+
describe('light-my-request instrumentation', () => {
11+
const startServerCh = dc.channel('apm:http:server:request:start')
12+
const exitServerCh = dc.channel('apm:http:server:request:exit')
13+
const finishServerCh = dc.channel('apm:http:server:request:finish')
14+
const errorServerCh = dc.channel('apm:http:server:request:error')
15+
16+
let startStub, exitStub, finishStub, errorStub
17+
let inject, Fastify
18+
19+
before(async () => {
20+
await agent.load(['http', 'fastify', 'light-my-request'], { client: false })
21+
inject = require('light-my-request')
22+
Fastify = require('fastify')
23+
})
24+
25+
after(() => {
26+
return agent.close({ ritmReset: false })
27+
})
28+
29+
beforeEach(() => {
30+
startStub = sinon.stub()
31+
exitStub = sinon.stub()
32+
finishStub = sinon.stub()
33+
errorStub = sinon.stub()
34+
35+
startServerCh.subscribe(startStub)
36+
exitServerCh.subscribe(exitStub)
37+
finishServerCh.subscribe(finishStub)
38+
errorServerCh.subscribe(errorStub)
39+
})
40+
41+
afterEach(() => {
42+
startServerCh.unsubscribe(startStub)
43+
exitServerCh.unsubscribe(exitStub)
44+
finishServerCh.unsubscribe(finishStub)
45+
errorServerCh.unsubscribe(errorStub)
46+
})
47+
48+
describe('with Fastify inject()', () => {
49+
let app
50+
51+
beforeEach(async () => {
52+
app = Fastify()
53+
app.get('/test', async (req, reply) => {
54+
return { success: true }
55+
})
56+
app.get('/error', async (req, reply) => {
57+
throw new Error('Test error')
58+
})
59+
await app.ready()
60+
})
61+
62+
afterEach(async () => {
63+
await app.close()
64+
})
65+
66+
it('should publish to start channel on inject', async () => {
67+
await app.inject({
68+
method: 'GET',
69+
url: '/test'
70+
})
71+
72+
sinon.assert.called(startStub)
73+
74+
// Find the call for our /test request (filter out any dd-trace internal requests)
75+
const testCall = startStub.getCalls().find(call => {
76+
const { req } = call.args[0]
77+
return req.url === '/test'
78+
})
79+
80+
assert(testCall, 'start channel should be called for /test request')
81+
const { req, res, abortController } = testCall.args[0]
82+
assert.strictEqual(req.url, '/test')
83+
assert.strictEqual(req.method, 'GET')
84+
assert(res, 'res should be provided')
85+
assert(abortController instanceof AbortController, 'abortController should be provided')
86+
})
87+
88+
it('should publish to exit channel after inject dispatch', async () => {
89+
await app.inject({
90+
method: 'GET',
91+
url: '/test'
92+
})
93+
94+
sinon.assert.called(exitStub)
95+
96+
const testCall = exitStub.getCalls().find(call => {
97+
const { req } = call.args[0]
98+
return req.url === '/test'
99+
})
100+
101+
assert(testCall, 'exit channel should be called for /test request')
102+
})
103+
104+
it('should publish to finish channel when response completes', async () => {
105+
await app.inject({
106+
method: 'GET',
107+
url: '/test'
108+
})
109+
110+
// Wait a tick for finish event to propagate
111+
await new Promise(resolve => setImmediate(resolve))
112+
113+
sinon.assert.called(finishStub)
114+
115+
const testCall = finishStub.getCalls().find(call => {
116+
const { req } = call.args[0]
117+
return req.url === '/test'
118+
})
119+
120+
assert(testCall, 'finish channel should be called for /test request')
121+
})
122+
123+
it('should link res.req for context tracking', async () => {
124+
let capturedRes
125+
126+
const handler = ({ req, res }) => {
127+
if (req.url === '/test') {
128+
capturedRes = res
129+
}
130+
}
131+
startServerCh.subscribe(handler)
132+
133+
try {
134+
await app.inject({
135+
method: 'GET',
136+
url: '/test'
137+
})
138+
139+
assert(capturedRes, 'response should be captured')
140+
assert(capturedRes.req, 'res.req should be set')
141+
assert.strictEqual(capturedRes.req.url, '/test')
142+
} finally {
143+
startServerCh.unsubscribe(handler)
144+
}
145+
})
146+
147+
it('should provide abortController to subscribers', async () => {
148+
let capturedAbortController
149+
150+
const handler = ({ req, abortController }) => {
151+
if (req.url === '/test') {
152+
capturedAbortController = abortController
153+
}
154+
}
155+
startServerCh.subscribe(handler)
156+
157+
try {
158+
await app.inject({
159+
method: 'GET',
160+
url: '/test'
161+
})
162+
163+
assert(capturedAbortController, 'abortController should be captured')
164+
assert(capturedAbortController instanceof AbortController, 'should be an AbortController')
165+
assert.strictEqual(typeof capturedAbortController.abort, 'function', 'should have abort method')
166+
} finally {
167+
startServerCh.unsubscribe(handler)
168+
}
169+
})
170+
171+
it('should pass request headers correctly', async () => {
172+
let capturedHeaders
173+
174+
const handler = ({ req }) => {
175+
if (req.url === '/test') {
176+
capturedHeaders = req.headers
177+
}
178+
}
179+
startServerCh.subscribe(handler)
180+
181+
try {
182+
await app.inject({
183+
method: 'GET',
184+
url: '/test',
185+
headers: {
186+
'x-custom-header': 'test-value',
187+
'x-trace-id': '12345'
188+
}
189+
})
190+
191+
assert(capturedHeaders, 'headers should be captured')
192+
assert.strictEqual(capturedHeaders['x-custom-header'], 'test-value')
193+
assert.strictEqual(capturedHeaders['x-trace-id'], '12345')
194+
} finally {
195+
startServerCh.unsubscribe(handler)
196+
}
197+
})
198+
199+
it('should work with different HTTP methods', async () => {
200+
const methods = []
201+
202+
// Create a new app with all routes registered before ready()
203+
const multiMethodApp = Fastify()
204+
multiMethodApp.get('/multi', async () => ({ method: 'GET' }))
205+
multiMethodApp.post('/multi', async () => ({ method: 'POST' }))
206+
multiMethodApp.put('/multi', async () => ({ method: 'PUT' }))
207+
multiMethodApp.delete('/multi', async () => ({ method: 'DELETE' }))
208+
await multiMethodApp.ready()
209+
210+
const handler = ({ req }) => {
211+
if (req.url === '/multi') {
212+
methods.push(req.method)
213+
}
214+
}
215+
startServerCh.subscribe(handler)
216+
217+
try {
218+
await multiMethodApp.inject({ method: 'GET', url: '/multi' })
219+
await multiMethodApp.inject({ method: 'POST', url: '/multi' })
220+
await multiMethodApp.inject({ method: 'PUT', url: '/multi' })
221+
await multiMethodApp.inject({ method: 'DELETE', url: '/multi' })
222+
223+
assert.deepStrictEqual(methods, ['GET', 'POST', 'PUT', 'DELETE'])
224+
} finally {
225+
startServerCh.unsubscribe(handler)
226+
await multiMethodApp.close()
227+
}
228+
})
229+
})
230+
231+
describe('with standalone light-my-request', () => {
232+
it('should instrument direct inject() calls', async () => {
233+
const dispatchFunc = (req, res) => {
234+
res.writeHead(200, { 'Content-Type': 'application/json' })
235+
res.end(JSON.stringify({ ok: true }))
236+
}
237+
238+
await inject(dispatchFunc, {
239+
method: 'GET',
240+
url: '/standalone-test'
241+
})
242+
243+
sinon.assert.called(startStub)
244+
245+
const testCall = startStub.getCalls().find(call => {
246+
const { req } = call.args[0]
247+
return req.url === '/standalone-test'
248+
})
249+
250+
assert(testCall, 'start channel should be called for standalone inject')
251+
})
252+
253+
it('should work with callback style', (done) => {
254+
const dispatchFunc = (req, res) => {
255+
res.writeHead(200)
256+
res.end('OK')
257+
}
258+
259+
inject(dispatchFunc, { method: 'GET', url: '/callback-test' }, (err, response) => {
260+
if (err) return done(err)
261+
262+
try {
263+
sinon.assert.called(startStub)
264+
265+
const testCall = startStub.getCalls().find(call => {
266+
const { req } = call.args[0]
267+
return req.url === '/callback-test'
268+
})
269+
270+
assert(testCall, 'start channel should be called for callback-style inject')
271+
done()
272+
} catch (e) {
273+
done(e)
274+
}
275+
})
276+
})
277+
})
278+
})

packages/dd-trace/test/plugins/versions/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@
132132
"langchain": "1.0.3",
133133
"ldapjs": "3.0.7",
134134
"ldapjs-promise": "3.0.7",
135+
"light-my-request": "6.6.0",
135136
"limitd-client": "2.14.1",
136137
"lodash": "4.17.21",
137138
"loopback": "3.28.0",

0 commit comments

Comments
 (0)