Skip to content

Commit edb4236

Browse files
[test optimization] Add support for --workerThreads flag in jest (#7840)
1 parent 51e9264 commit edb4236

5 files changed

Lines changed: 150 additions & 26 deletions

File tree

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
'use strict'
2+
3+
const assert = require('node:assert/strict')
4+
const { parentPort } = require('node:worker_threads')
5+
6+
if (parentPort) {
7+
parentPort.postMessage({ source: 'jest-worker-message' })
8+
}
9+
10+
describe('jest-worker-message', () => {
11+
it('passes after sending a non-array worker message', () => {
12+
assert.strictEqual(true, true)
13+
})
14+
})

integration-tests/ci-visibility/run-jest.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ if (process.env.RUN_IN_PARALLEL) {
2121
options.maxWorkers = 2
2222
}
2323

24+
if (process.env.USE_WORKER_THREADS) {
25+
delete options.runInBand
26+
options.maxWorkers = 2
27+
options.workerThreads = true
28+
}
29+
2430
if (process.env.OLD_RUNNER) {
2531
options.testRunner = 'jest-jasmine2'
2632
}

integration-tests/jest/jest.spec.js

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -998,6 +998,93 @@ describe(`jest@${JEST_VERSION} commonJS`, () => {
998998
})
999999
})
10001000

1001+
context('when jest is using worker threads', () => {
1002+
onlyLatestIt('ignores non-array worker-thread messages', (done) => {
1003+
childProcess = fork(testFile, {
1004+
cwd,
1005+
env: {
1006+
...getCiVisAgentlessConfig(receiver.port),
1007+
TESTS_TO_RUN: 'jest-plugin-tests/jest-worker-message',
1008+
USE_WORKER_THREADS: 'true',
1009+
},
1010+
stdio: 'pipe',
1011+
})
1012+
childProcess.stdout?.on('data', (chunk) => {
1013+
testOutput += chunk.toString()
1014+
})
1015+
childProcess.stderr?.on('data', (chunk) => {
1016+
testOutput += chunk.toString()
1017+
})
1018+
1019+
Promise.all([
1020+
once(childProcess, 'message'),
1021+
receiver.gatherPayloads(({ url }) => url === '/api/v2/citestcycle', 5000),
1022+
]).then(([, eventsRequests]) => {
1023+
const tests = eventsRequests.map(({ payload }) => payload)
1024+
.flatMap(({ events }) => events)
1025+
.filter(event => event.type === 'test')
1026+
.map(event => event.content)
1027+
1028+
assert.strictEqual(tests.length, 1)
1029+
assert.strictEqual(
1030+
tests[0].meta[TEST_NAME],
1031+
'jest-worker-message passes after sending a non-array worker message'
1032+
)
1033+
assert.strictEqual(tests[0].meta[TEST_STATUS], 'pass')
1034+
assert.doesNotMatch(testOutput, /TypeError/)
1035+
done()
1036+
}).catch(done)
1037+
})
1038+
1039+
onlyLatestIt('reports tests when using agentless', (done) => {
1040+
childProcess = fork(testFile, {
1041+
cwd,
1042+
env: {
1043+
...getCiVisAgentlessConfig(receiver.port),
1044+
USE_WORKER_THREADS: 'true',
1045+
},
1046+
stdio: 'pipe',
1047+
})
1048+
1049+
receiver.gatherPayloads(({ url }) => url === '/api/v2/citestcycle', 5000).then(eventsRequests => {
1050+
const events = eventsRequests.map(({ payload }) => payload)
1051+
.flatMap(({ events }) => events)
1052+
const eventTypes = events.map(event => event.type)
1053+
assertObjectContains(eventTypes, ['test', 'test_suite_end', 'test_session_end', 'test_module_end'])
1054+
1055+
const tests = events.filter(event => event.type === 'test').map(event => event.content)
1056+
assert.ok(tests.length >= 2)
1057+
tests.forEach(testEvent => {
1058+
assert.strictEqual(testEvent.meta[TEST_STATUS], 'pass')
1059+
})
1060+
1061+
done()
1062+
}).catch(done)
1063+
})
1064+
1065+
onlyLatestIt('reports tests when using evp proxy', (done) => {
1066+
receiver.setInfoResponse({ endpoints: ['/evp_proxy/v2'] })
1067+
childProcess = fork(testFile, {
1068+
cwd,
1069+
env: {
1070+
...getCiVisEvpProxyConfig(receiver.port),
1071+
USE_WORKER_THREADS: 'true',
1072+
},
1073+
stdio: 'pipe',
1074+
})
1075+
1076+
receiver.gatherPayloads(({ url }) => url === '/evp_proxy/v2/api/v2/citestcycle', 5000)
1077+
.then(eventsRequests => {
1078+
const eventTypes = eventsRequests.map(({ payload }) => payload)
1079+
.flatMap(({ events }) => events)
1080+
.map(event => event.type)
1081+
1082+
assertObjectContains(eventTypes, ['test', 'test_suite_end', 'test_session_end', 'test_module_end'])
1083+
done()
1084+
}).catch(done)
1085+
})
1086+
})
1087+
10011088
it('reports timeout error message', (done) => {
10021089
childProcess = fork(testFile, {
10031090
cwd,

packages/datadog-instrumentations/src/jest.js

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1834,7 +1834,12 @@ addHook({
18341834

18351835
function onMessageWrapper (onMessage) {
18361836
return function () {
1837-
const [code, data] = arguments[0]
1837+
const response = arguments[0]
1838+
if (!Array.isArray(response)) {
1839+
return onMessage.apply(this, arguments)
1840+
}
1841+
1842+
const [code, data] = response
18381843
if (code === JEST_WORKER_TRACE_PAYLOAD_CODE) { // datadog trace payload
18391844
collectDynamicNamesFromTraces(data, newTestsWithDynamicNames)
18401845
workerReportTraceCh.publish(data)
@@ -1897,14 +1902,22 @@ function sendWrapper (send) {
18971902
}
18981903
}
18991904

1905+
function wrapWorker (worker) {
1906+
// ChildProcessWorker uses _child (child_process), ExperimentalWorker uses _worker (worker_threads)
1907+
const workerChannel = worker._child || worker._worker
1908+
if (!workerChannel) return
1909+
1910+
shimmer.wrap(workerChannel, worker._child ? 'send' : 'postMessage', sendWrapper)
1911+
shimmer.wrap(worker, '_onMessage', onMessageWrapper)
1912+
workerChannel.removeAllListeners('message')
1913+
workerChannel.on('message', worker._onMessage.bind(worker))
1914+
}
1915+
19001916
function enqueueWrapper (enqueue) {
19011917
return function () {
19021918
shimmer.wrap(arguments[0], 'onStart', onStart => function (worker) {
19031919
if (worker && !wrappedWorkers.has(worker)) {
1904-
shimmer.wrap(worker._child, 'send', sendWrapper)
1905-
shimmer.wrap(worker, '_onMessage', onMessageWrapper)
1906-
worker._child.removeAllListeners('message')
1907-
worker._child.on('message', worker._onMessage.bind(worker))
1920+
wrapWorker(worker)
19081921
wrappedWorkers.add(worker)
19091922
}
19101923
return onStart.apply(this, arguments)
@@ -1934,6 +1947,21 @@ addHook({
19341947
return childProcessWorker
19351948
})
19361949

1950+
addHook({
1951+
name: 'jest-worker',
1952+
versions: ['>=24.9.0 <30.0.0'],
1953+
file: 'build/workers/NodeThreadsWorker.js',
1954+
}, (nodeThreadsWorker) => {
1955+
const ExperimentalWorker = nodeThreadsWorker.default
1956+
shimmer.wrap(ExperimentalWorker.prototype, 'send', sendWrapper)
1957+
if (ExperimentalWorker.prototype._onMessage) {
1958+
shimmer.wrap(ExperimentalWorker.prototype, '_onMessage', onMessageWrapper)
1959+
} else if (ExperimentalWorker.prototype.onMessage) {
1960+
shimmer.wrap(ExperimentalWorker.prototype, 'onMessage', onMessageWrapper)
1961+
}
1962+
return nodeThreadsWorker
1963+
})
1964+
19371965
addHook({
19381966
name: 'jest-worker',
19391967
versions: ['>=30.0.0'],

packages/dd-trace/src/ci-visibility/exporters/test-worker/writer.js

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,6 @@
22
const { JSONEncoder } = require('../../encode/json-encoder')
33
const { getEnvironmentVariable } = require('../../../config/helper')
44
const log = require('../../../log')
5-
const {
6-
VITEST_WORKER_TRACE_PAYLOAD_CODE,
7-
VITEST_WORKER_LOGS_PAYLOAD_CODE,
8-
} = require('../../../plugins/util/test')
95

106
class Writer {
117
constructor (interprocessCode) {
@@ -29,12 +25,6 @@ class Writer {
2925
}
3026

3127
_sendPayload (data, onDone = () => {}) {
32-
// ## Jest
33-
// Only available when `child_process` is used for the jest worker.
34-
// If worker_threads is used, this will not work
35-
// TODO: make `jest` instrumentation compatible with worker_threads
36-
// https://github.com/facebook/jest/blob/bb39cb2c617a3334bf18daeca66bd87b7ccab28b/packages/jest-worker/README.md#experimental-worker
37-
3828
// ## Cucumber
3929
// This reports to the test's main process the same way test data is reported by Cucumber
4030
// See cucumber code:
@@ -47,29 +37,28 @@ class Writer {
4737
? { __tinypool_worker_message__: true, interprocessCode: this._interprocessCode, data }
4838
: [this._interprocessCode, data]
4939

50-
const isVitestTestWorker =
51-
this._interprocessCode === VITEST_WORKER_TRACE_PAYLOAD_CODE ||
52-
this._interprocessCode === VITEST_WORKER_LOGS_PAYLOAD_CODE
53-
40+
// child_process workers (jest default, cucumber)
5441
if (process.send) {
5542
process.send(payload, () => {
5643
onDone()
5744
})
58-
} else if (isVitestTestWorker) { // TODO: worker_threads are only supported in vitest right now
59-
const { isMainThread, parentPort } = require('worker_threads')
60-
if (isMainThread) {
61-
return onDone()
62-
}
45+
return
46+
}
47+
48+
// worker_threads (jest --workerThreads, vitest)
49+
const { isMainThread, parentPort } = require('node:worker_threads')
50+
if (!isMainThread && parentPort) {
6351
try {
6452
parentPort.postMessage(payload)
6553
} catch (error) {
6654
log.error('Error posting message to parent port', error)
6755
} finally {
6856
onDone()
6957
}
70-
} else {
71-
onDone()
58+
return
7259
}
60+
61+
onDone()
7362
}
7463
}
7564

0 commit comments

Comments
 (0)