diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 937f7826..31e9e8bf 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -40,7 +40,7 @@ jobs: target-name: 'dd_pprof' # target name in binding.gyp package-manager: 'npm' # npm or yarn cache: true # enable caching of dependencies based on lockfile - min-node-version: 16 + min-node-version: 18 skip: 'linux-arm,linux-ia32' # skip building for these platforms dev_publish: diff --git a/.github/workflows/package-size.yml b/.github/workflows/package-size.yml index b346fd30..df44eb5e 100644 --- a/.github/workflows/package-size.yml +++ b/.github/workflows/package-size.yml @@ -19,7 +19,7 @@ jobs: - name: Setup Node.js uses: actions/setup-node@v2 with: - node-version: '16' + node-version: '22' - run: yarn - name: Compute module size tree and report uses: qard/heaviest-objects-in-the-universe@v1 diff --git a/bindings/profilers/wall.cc b/bindings/profilers/wall.cc index 5a8dd184..75dba176 100644 --- a/bindings/profilers/wall.cc +++ b/bindings/profilers/wall.cc @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -120,6 +121,18 @@ class ProtectedProfilerMap { return profiler; } + WallProfiler* RemoveProfilerForIsolate(const v8::Isolate* isolate) { + return UpdateProfilers([isolate](auto map) { + auto it = map->find(isolate); + if (it != map->end()) { + auto profiler = it->second; + map->erase(it); + return profiler; + } + return static_cast(nullptr); + }); + } + bool RemoveProfiler(const v8::Isolate* isolate, WallProfiler* profiler) { return UpdateProfilers([isolate, profiler, this](auto map) { terminatedWorkersCpu_ += profiler->GetAndResetThreadCpu(); @@ -177,8 +190,10 @@ class ProtectedProfilerMap { } private: + using ProfilerMap = std::unordered_map; + template - bool UpdateProfilers(F updateFn) { + std::invoke_result_t UpdateProfilers(F updateFn) { // use mutex to prevent two isolates of updating profilers concurrently std::lock_guard lock(update_mutex_); @@ -207,7 +222,6 @@ class ProtectedProfilerMap { return res; } - using ProfilerMap = std::unordered_map; mutable std::atomic profilers_; std::mutex update_mutex_; bool init_ = false; @@ -366,6 +380,27 @@ static int64_t GetV8ToEpochOffset() { return V8toEpochOffset; } +void WallProfiler::CleanupHook(void* data) { + auto isolate = static_cast(data); + auto prof = g_profilers.RemoveProfilerForIsolate(isolate); + if (prof) { + prof->Cleanup(isolate); + delete prof; + } +} + +// This is only called when isolate is terminated without `beforeExit` +// notification. +void WallProfiler::Cleanup(Isolate* isolate) { + if (started_) { + cpuProfiler_->Stop(profileId_); + if (interceptSignal()) { + SignalHandler::DecreaseUseCount(); + } + Dispose(isolate, false); + } +} + ContextsByNode WallProfiler::GetContextsByNode(CpuProfile* profile, ContextBuffer& contexts, int64_t startCpuTime) { @@ -547,21 +582,22 @@ WallProfiler::WallProfiler(std::chrono::microseconds samplingPeriod, } } -WallProfiler::~WallProfiler() { - Dispose(nullptr); -} - -void WallProfiler::Dispose(Isolate* isolate) { +void WallProfiler::Dispose(Isolate* isolate, bool removeFromMap) { if (cpuProfiler_ != nullptr) { cpuProfiler_->Dispose(); cpuProfiler_ = nullptr; - g_profilers.RemoveProfiler(isolate, this); + if (removeFromMap) { + g_profilers.RemoveProfiler(isolate, this); + } - if (isolate != nullptr && collectAsyncId_) { + if (collectAsyncId_) { isolate->RemoveGCPrologueCallback(&GCPrologueCallback, this); isolate->RemoveGCEpilogueCallback(&GCEpilogueCallback, this); } + + node::RemoveEnvironmentCleanupHook( + isolate, &WallProfiler::CleanupHook, isolate); } } @@ -702,17 +738,19 @@ Result WallProfiler::StartImpl() { : CollectionMode::kNoCollect); collectionMode_.store(collectionMode, std::memory_order_relaxed); started_ = true; + auto isolate = Isolate::GetCurrent(); + node::AddEnvironmentCleanupHook(isolate, &WallProfiler::CleanupHook, isolate); return {}; } -std::string WallProfiler::StartInternal() { +v8::ProfilerId WallProfiler::StartInternal() { // Reuse the same names for the profiles because strings used for profile // names are not released until v8::CpuProfiler object is destroyed. // https://github.com/nodejs/node/blob/b53c51995380b1f8d642297d848cab6010d2909c/deps/v8/src/profiler/profile-generator.h#L516 char buf[128]; snprintf(buf, sizeof(buf), "pprof-%" PRId64, (profileIdx_++) % 2); v8::Local title = Nan::New(buf).ToLocalChecked(); - cpuProfiler_->StartProfiling( + auto result = cpuProfiler_->Start( title, includeLines_ ? CpuProfilingMode::kCallerLineNumbers : CpuProfilingMode::kLeafNodeLineNumbers, @@ -752,7 +790,7 @@ std::string WallProfiler::StartInternal() { cpuProfiler_->CollectSample(v8::Isolate::GetCurrent()); } - return buf; + return result.id; } NAN_METHOD(WallProfiler::Stop) { @@ -837,12 +875,11 @@ Result WallProfiler::StopImpl(bool restart, v8::Local& profile) { std::atomic_signal_fence(std::memory_order_acquire); } - if (withContexts_ || workaroundV8Bug_) { + if (interceptSignal()) { SignalHandler::DecreaseUseCount(); } - auto v8_profile = cpuProfiler_->StopProfiling( - Nan::New(oldProfileId).ToLocalChecked()); + auto v8_profile = cpuProfiler_->Stop(oldProfileId); ContextBuffer contexts; if (withContexts_) { @@ -896,7 +933,7 @@ Result WallProfiler::StopImpl(bool restart, v8::Local& profile) { v8_profile->Delete(); if (!restart) { - Dispose(v8::Isolate::GetCurrent()); + Dispose(v8::Isolate::GetCurrent(), true); } else if (workaroundV8Bug_) { waitForSignal(callCount + 1); collectionMode_.store(withContexts_ ? CollectionMode::kCollectContexts @@ -1017,6 +1054,10 @@ NAN_METHOD(WallProfiler::V8ProfilerStuckEventLoopDetected) { NAN_METHOD(WallProfiler::Dispose) { auto profiler = Nan::ObjectWrap::Unwrap(info.This()); + // Profiler must already be stopped when this is called. + if (profiler->started_) { + return Nan::ThrowTypeError("Profiler is still running, stop it first."); + } delete profiler; } diff --git a/bindings/profilers/wall.hh b/bindings/profilers/wall.hh index f7aac3b9..caea8b5c 100644 --- a/bindings/profilers/wall.hh +++ b/bindings/profilers/wall.hh @@ -62,7 +62,7 @@ class WallProfiler : public Nan::ObjectWrap { std::atomic collectionMode_; std::atomic noCollectCallCount_; - std::string profileId_; + v8::ProfilerId profileId_; uint64_t profileIdx_ = 0; bool includeLines_ = false; bool withContexts_ = false; @@ -92,8 +92,8 @@ class WallProfiler : public Nan::ObjectWrap { using ContextBuffer = std::vector; ContextBuffer contexts_; - ~WallProfiler(); - void Dispose(v8::Isolate* isolate); + ~WallProfiler() = default; + void Dispose(v8::Isolate* isolate, bool removeFromMap); // A new CPU profiler object will be created each time profiling is started // to work around https://bugs.chromium.org/p/v8/issues/detail?id=11051. @@ -104,6 +104,8 @@ class WallProfiler : public Nan::ObjectWrap { int64_t startCpuTime); bool waitForSignal(uint64_t targetCallCount = 0); + static void CleanupHook(void* data); + void Cleanup(v8::Isolate* isolate); public: /** @@ -129,7 +131,7 @@ class WallProfiler : public Nan::ObjectWrap { int64_t cpu_time, double async_id); Result StartImpl(); - std::string StartInternal(); + v8::ProfilerId StartInternal(); Result StopImpl(bool restart, v8::Local& profile); CollectionMode collectionMode() { @@ -143,6 +145,8 @@ class WallProfiler : public Nan::ObjectWrap { bool collectCpuTime() const { return collectCpuTime_; } + bool interceptSignal() const { return withContexts_ || workaroundV8Bug_; } + int v8ProfilerStuckEventLoopDetected() const { return v8ProfilerStuckEventLoopDetected_; } diff --git a/ts/test/test-worker-threads.ts b/ts/test/test-worker-threads.ts index d36096b9..d659da02 100644 --- a/ts/test/test-worker-threads.ts +++ b/ts/test/test-worker-threads.ts @@ -1,6 +1,7 @@ // eslint-disable-next-line node/no-unsupported-features/node-builtins import {execFile} from 'child_process'; import {promisify} from 'util'; +import {Worker} from 'worker_threads'; const exec = promisify(execFile); @@ -11,4 +12,34 @@ describe('Worker Threads', () => { const nbWorkers = 2; return exec('node', ['./out/test/worker.js', String(nbWorkers)]); }); + + it('should not crash when worker is terminated', async function () { + this.timeout(30000); + const nruns = 5; + const concurrentWorkers = 20; + for (let i = 0; i < nruns; i++) { + const workers = []; + for (let j = 0; j < concurrentWorkers; j++) { + const worker = new Worker('./out/test/worker2.js'); + worker.postMessage('hello'); + + worker.on('message', () => { + worker.terminate(); + }); + + workers.push( + new Promise((resolve, reject) => { + worker.on('exit', exitCode => { + if (exitCode === 1) { + resolve(); + } else { + reject(new Error('Worker exited with code 0')); + } + }); + }) + ); + } + await Promise.all(workers); + } + }); }); diff --git a/ts/test/worker2.ts b/ts/test/worker2.ts new file mode 100644 index 00000000..3c2c55da --- /dev/null +++ b/ts/test/worker2.ts @@ -0,0 +1,23 @@ +import {parentPort} from 'node:worker_threads'; +import {time} from '../src/index'; + +const delay = (ms: number) => new Promise(res => setTimeout(res, ms)); + +const DURATION_MILLIS = 1000; +const INTERVAL_MICROS = 10000; +const withContexts = + process.platform === 'darwin' || process.platform === 'linux'; + +time.start({ + durationMillis: DURATION_MILLIS, + intervalMicros: INTERVAL_MICROS, + withContexts: withContexts, + collectCpuTime: withContexts, + collectAsyncId: false, +}); + +parentPort?.on('message', () => { + delay(50).then(() => { + parentPort?.postMessage('hello'); + }); +});