From 630c005b75d3d5d21b4066b031cde23861777c80 Mon Sep 17 00:00:00 2001 From: legendecas Date: Fri, 11 Oct 2019 19:49:16 +0800 Subject: [PATCH 1/8] Implement AsyncProgressWorkerBase --- napi-inl.h | 89 +++++++++++++++++++++++++++++++++++++++--------------- napi.h | 59 +++++++++++++++++++++++++++++++++--- 2 files changed, 119 insertions(+), 29 deletions(-) diff --git a/napi-inl.h b/napi-inl.h index 2855e80ef..1c0fb293f 100644 --- a/napi-inl.h +++ b/napi-inl.h @@ -4587,9 +4587,59 @@ inline void ThreadSafeFunction::CallJS(napi_env env, } //////////////////////////////////////////////////////////////////////////////// -// Async Progress Worker class +// Async Progress Worker Base class //////////////////////////////////////////////////////////////////////////////// +template +inline AsyncProgressWorkerBase::AsyncProgressWorkerBase(const Object& receiver, + const Function& callback, + const char* resource_name, + const Object& resource, + size_t queue_size) + : AsyncWorker(receiver, callback, resource_name, resource) { + // Fill all possible arguments to work around from ambiguous ThreadSafeFunction::New signatures. + _tsfn = ThreadSafeFunction::New(callback.Env(), callback, resource, resource_name, queue_size, 1, this, Finalizer, this); +} + +#if NAPI_VERSION > 4 +template +inline AsyncProgressWorkerBase::AsyncProgressWorkerBase(Napi::Env env, + const char* resource_name, + const Object& resource, + size_t queue_size) + : AsyncWorker(env, resource_name, resource) { + // TODO: Once the changes to make the callback optional for threadsafe + // functions are no longer optional we can remove the dummy Function here. + Function callback; + // Fill all possible arguments to work around from ambiguous ThreadSafeFunction::New signatures. + _tsfn = ThreadSafeFunction::New(env, callback, resource, resource_name, queue_size, 1, this, Finalizer, this); +} +#endif + +template +inline AsyncProgressWorkerBase::~AsyncProgressWorkerBase() { + // Abort pending tsfn call. + // Don't send progress events after we've already completed. + this->_tsfn.Abort(); + this->_tsfn.Release(); +} +template +inline void AsyncProgressWorkerBase::OnAsyncWorkProgress(Napi::Env /* env */, + Napi::Function /* jsCallback */, + void* data) { + ThreadSafeData* tsd = static_cast(data); + tsd->asyncprogressworker()->OnWorkProgress(tsd->data()); +} + +template +inline void AsyncProgressWorkerBase::NonBlockingCall(DataType* data) { + auto tsd = new AsyncProgressWorkerBase::ThreadSafeData(this, data); + _tsfn.NonBlockingCall(tsd, OnAsyncWorkProgress); +} + +//////////////////////////////////////////////////////////////////////////////// +// Async Progress Worker class +//////////////////////////////////////////////////////////////////////////////// template inline AsyncProgressWorker::AsyncProgressWorker(const Function& callback) : AsyncProgressWorker(callback, "generic") { @@ -4632,10 +4682,9 @@ inline AsyncProgressWorker::AsyncProgressWorker(const Object& receiver, const Function& callback, const char* resource_name, const Object& resource) - : AsyncWorker(receiver, callback, resource_name, resource), + : AsyncProgressWorkerBase(receiver, callback, resource_name, resource), _asyncdata(nullptr), _asyncsize(0) { - _tsfn = ThreadSafeFunction::New(callback.Env(), callback, resource_name, 1, 1); } #if NAPI_VERSION > 4 @@ -4654,27 +4703,19 @@ template inline AsyncProgressWorker::AsyncProgressWorker(Napi::Env env, const char* resource_name, const Object& resource) - : AsyncWorker(env, resource_name, resource), + : AsyncProgressWorkerBase(env, resource_name, resource), _asyncdata(nullptr), _asyncsize(0) { - // TODO: Once the changes to make the callback optional for threadsafe - // functions are no longer optional we can remove the dummy Function here. - Function callback; - _tsfn = ThreadSafeFunction::New(env, callback, resource_name, 1, 1); } #endif template inline AsyncProgressWorker::~AsyncProgressWorker() { - // Abort pending tsfn call. - // Don't send progress events after we've already completed. - _tsfn.Abort(); { - std::lock_guard lock(_mutex); + std::lock_guard lock(this->_mutex); _asyncdata = nullptr; _asyncsize = 0; } - _tsfn.Release(); } template @@ -4684,20 +4725,18 @@ inline void AsyncProgressWorker::Execute() { } template -inline void AsyncProgressWorker::WorkProgress_(Napi::Env /* env */, Napi::Function /* jsCallback */, void* _data) { - AsyncProgressWorker* self = static_cast(_data); - +inline void AsyncProgressWorker::OnWorkProgress(void*) { T* data; size_t size; { - std::lock_guard lock(self->_mutex); - data = self->_asyncdata; - size = self->_asyncsize; - self->_asyncdata = nullptr; - self->_asyncsize = 0; + std::lock_guard lock(this->_mutex); + data = this->_asyncdata; + size = this->_asyncsize; + this->_asyncdata = nullptr; + this->_asyncsize = 0; } - self->OnProgress(data, size); + this->OnProgress(data, size); delete[] data; } @@ -4708,19 +4747,19 @@ inline void AsyncProgressWorker::SendProgress_(const T* data, size_t count) { T* old_data; { - std::lock_guard lock(_mutex); + std::lock_guard lock(this->_mutex); old_data = _asyncdata; _asyncdata = new_data; _asyncsize = count; } - _tsfn.NonBlockingCall(this, WorkProgress_); + this->NonBlockingCall(nullptr); delete[] old_data; } template inline void AsyncProgressWorker::Signal() const { - _tsfn.NonBlockingCall(this, WorkProgress_); + this->NonBlockingCall(nullptr); } template diff --git a/napi.h b/napi.h index 49435d59b..a5e326b43 100644 --- a/napi.h +++ b/napi.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -1966,6 +1967,12 @@ namespace Napi { napi_async_context _context; }; + + inline void OnAsyncWorkExecute(napi_env env, void* asyncworker); + inline void OnAsyncWorkComplete(napi_env env, + napi_status status, + void* asyncworker); + class AsyncWorker { public: virtual ~AsyncWorker(); @@ -2242,8 +2249,53 @@ namespace Napi { napi_threadsafe_function _tsfn; }; + template + class AsyncProgressWorkerBase : public AsyncWorker { + public: + virtual void OnWorkProgress(DataType* data) = 0; + class ThreadSafeData { + public: + ThreadSafeData(AsyncProgressWorkerBase* asyncprogressworker, DataType* data) + : _asyncprogressworker(asyncprogressworker), _data(data) {} + + AsyncProgressWorkerBase* asyncprogressworker() { return _asyncprogressworker; }; + DataType* data() { return _data; }; + + private: + AsyncProgressWorkerBase* _asyncprogressworker; + DataType* _data; + }; + protected: + explicit AsyncProgressWorkerBase(const Object& receiver, + const Function& callback, + const char* resource_name, + const Object& resource, + size_t queue_size = 1); + virtual ~AsyncProgressWorkerBase(); + +// Optional callback of Napi::ThreadSafeFunction only available after NAPI_VERSION 4. +// Refs: https://github.com/nodejs/node/pull/27791 +#if NAPI_VERSION > 4 + explicit AsyncProgressWorkerBase(Napi::Env env, + const char* resource_name, + const Object& resource, + size_t queue_size = 1); +#endif + + static inline void OnAsyncWorkProgress(Napi::Env env, + Napi::Function jsCallback, + void* data); + + + void NonBlockingCall(DataType* data); + + private: + ThreadSafeFunction _tsfn; + static inline void Finalizer(Napi::Env env, void* data, AsyncProgressWorkerBase* context) {}; + }; + template - class AsyncProgressWorker : public AsyncWorker { + class AsyncProgressWorker : public AsyncProgressWorkerBase { public: virtual ~AsyncProgressWorker(); @@ -2257,6 +2309,8 @@ namespace Napi { AsyncProgressWorker* const _worker; }; + void OnWorkProgress(void*) override; + protected: explicit AsyncProgressWorker(const Function& callback); explicit AsyncProgressWorker(const Function& callback, @@ -2288,8 +2342,6 @@ namespace Napi { virtual void OnProgress(const T* data, size_t count) = 0; private: - static void WorkProgress_(Napi::Env env, Napi::Function jsCallback, void* data); - void Execute() override; void Signal() const; void SendProgress_(const T* data, size_t count); @@ -2297,7 +2349,6 @@ namespace Napi { std::mutex _mutex; T* _asyncdata; size_t _asyncsize; - ThreadSafeFunction _tsfn; }; #endif From 01d157cc3de15f40446a66129c4d0eea4f318752 Mon Sep 17 00:00:00 2001 From: legendecas Date: Sun, 26 Jan 2020 23:54:44 +0800 Subject: [PATCH 2/8] Implement AsyncProgressQueueWorker --- napi-inl.h | 117 +++++++++++++++++++++++++++++++ napi.h | 54 ++++++++++++++ test/asyncprogressqueueworker.cc | 78 +++++++++++++++++++++ test/asyncprogressqueueworker.js | 42 +++++++++++ test/binding.cc | 2 + test/binding.gyp | 1 + test/index.js | 2 + 7 files changed, 296 insertions(+) create mode 100644 test/asyncprogressqueueworker.cc create mode 100644 test/asyncprogressqueueworker.js diff --git a/napi-inl.h b/napi-inl.h index 1c0fb293f..425864454 100644 --- a/napi-inl.h +++ b/napi-inl.h @@ -4771,6 +4771,123 @@ template inline void AsyncProgressWorker::ExecutionProgress::Send(const T* data, size_t count) const { _worker->SendProgress_(data, count); } + +//////////////////////////////////////////////////////////////////////////////// +// Async Progress Queue Worker class +//////////////////////////////////////////////////////////////////////////////// +template +inline AsyncProgressQueueWorker::AsyncProgressQueueWorker(const Function& callback) + : AsyncProgressQueueWorker(callback, "generic") { +} + +template +inline AsyncProgressQueueWorker::AsyncProgressQueueWorker(const Function& callback, + const char* resource_name) + : AsyncProgressQueueWorker(callback, resource_name, Object::New(callback.Env())) { +} + +template +inline AsyncProgressQueueWorker::AsyncProgressQueueWorker(const Function& callback, + const char* resource_name, + const Object& resource) + : AsyncProgressQueueWorker(Object::New(callback.Env()), + callback, + resource_name, + resource) { +} + +template +inline AsyncProgressQueueWorker::AsyncProgressQueueWorker(const Object& receiver, + const Function& callback) + : AsyncProgressQueueWorker(receiver, callback, "generic") { +} + +template +inline AsyncProgressQueueWorker::AsyncProgressQueueWorker(const Object& receiver, + const Function& callback, + const char* resource_name) + : AsyncProgressQueueWorker(receiver, + callback, + resource_name, + Object::New(callback.Env())) { +} + +template +inline AsyncProgressQueueWorker::AsyncProgressQueueWorker(const Object& receiver, + const Function& callback, + const char* resource_name, + const Object& resource) + : AsyncProgressWorkerBase>(receiver, callback, resource_name, resource, /** unlimited queue size */0) { +} + +#if NAPI_VERSION > 4 +template +inline AsyncProgressQueueWorker::AsyncProgressQueueWorker(Napi::Env env) + : AsyncProgressQueueWorker(env, "generic") { +} + +template +inline AsyncProgressQueueWorker::AsyncProgressQueueWorker(Napi::Env env, + const char* resource_name) + : AsyncProgressQueueWorker(env, resource_name, Object::New(env)) { +} + +template +inline AsyncProgressQueueWorker::AsyncProgressQueueWorker(Napi::Env env, + const char* resource_name, + const Object& resource) + : AsyncProgressWorkerBase>(env, resource_name, resource, /** unlimited queue size */0) { +} +#endif + +template +inline void AsyncProgressQueueWorker::Execute() { + ExecutionProgress progress(this); + Execute(progress); +} + +template +inline void AsyncProgressQueueWorker::OnWorkProgress(std::pair* datapair) { + if (datapair == nullptr) { + return; + } + + T *data = datapair->first; + size_t size = datapair->second; + + this->OnProgress(data, size); + delete datapair; + delete[] data; +} + +template +inline void AsyncProgressQueueWorker::SendProgress_(const T* data, size_t count) { + T* new_data = new T[count]; + std::copy(data, data + count, new_data); + + auto pair = new std::pair(new_data, count); + this->NonBlockingCall(pair); +} + +template +inline void AsyncProgressQueueWorker::Signal() const { + this->NonBlockingCall(nullptr); +} + +template +inline void AsyncProgressQueueWorker::OnWorkComplete(Napi::Env env, napi_status status) { + AsyncWorker::OnWorkComplete(env, status); +} + +template +inline void AsyncProgressQueueWorker::ExecutionProgress::Signal() const { + _worker->Signal(); +} + +template +inline void AsyncProgressQueueWorker::ExecutionProgress::Send(const T* data, size_t count) const { + _worker->SendProgress_(data, count); +} #endif //////////////////////////////////////////////////////////////////////////////// diff --git a/napi.h b/napi.h index a5e326b43..1f5bc8a61 100644 --- a/napi.h +++ b/napi.h @@ -2350,6 +2350,60 @@ namespace Napi { T* _asyncdata; size_t _asyncsize; }; + + template + class AsyncProgressQueueWorker : public AsyncProgressWorkerBase> { + public: + virtual ~AsyncProgressQueueWorker() {}; + + class ExecutionProgress { + friend class AsyncProgressQueueWorker; + public: + void Signal() const; + void Send(const T* data, size_t count) const; + private: + explicit ExecutionProgress(AsyncProgressQueueWorker* worker) : _worker(worker) {} + AsyncProgressQueueWorker* const _worker; + }; + + void OnWorkComplete(Napi::Env env, napi_status status) override; + void OnWorkProgress(std::pair*) override; + + protected: + explicit AsyncProgressQueueWorker(const Function& callback); + explicit AsyncProgressQueueWorker(const Function& callback, + const char* resource_name); + explicit AsyncProgressQueueWorker(const Function& callback, + const char* resource_name, + const Object& resource); + explicit AsyncProgressQueueWorker(const Object& receiver, + const Function& callback); + explicit AsyncProgressQueueWorker(const Object& receiver, + const Function& callback, + const char* resource_name); + explicit AsyncProgressQueueWorker(const Object& receiver, + const Function& callback, + const char* resource_name, + const Object& resource); + +// Optional callback of Napi::ThreadSafeFunction only available after NAPI_VERSION 4. +// Refs: https://github.com/nodejs/node/pull/27791 +#if NAPI_VERSION > 4 + explicit AsyncProgressQueueWorker(Napi::Env env); + explicit AsyncProgressQueueWorker(Napi::Env env, + const char* resource_name); + explicit AsyncProgressQueueWorker(Napi::Env env, + const char* resource_name, + const Object& resource); +#endif + virtual void Execute(const ExecutionProgress& progress) = 0; + virtual void OnProgress(const T* data, size_t count) = 0; + + private: + void Execute() override; + void Signal() const; + void SendProgress_(const T* data, size_t count); + }; #endif // Memory management. diff --git a/test/asyncprogressqueueworker.cc b/test/asyncprogressqueueworker.cc new file mode 100644 index 000000000..aa6f4d8d2 --- /dev/null +++ b/test/asyncprogressqueueworker.cc @@ -0,0 +1,78 @@ +#include "napi.h" + +#include +#include +#include +#include + +#if (NAPI_VERSION > 3) + +using namespace Napi; + +namespace { + +struct ProgressData { + int32_t progress; +}; + +class TestWorker : public AsyncProgressQueueWorker { +public: + static void DoWork(const CallbackInfo& info) { + int32_t times = info[0].As().Int32Value(); + Function cb = info[1].As(); + Function progress = info[2].As(); + + TestWorker* worker = new TestWorker(cb, progress, "TestResource", Object::New(info.Env())); + worker->_times = times; + worker->Queue(); + } + +protected: + void Execute(const ExecutionProgress& progress) override { + if (_times < 0) { + SetError("test error"); + } + ProgressData data{0}; + for (int32_t idx = 0; idx < _times; idx++) { + data.progress = idx; + progress.Send(&data, 1); + } + // keep worker alive until we processed all progress. + if (_times > 0) { + std::unique_lock lock(_cvm); + _cv.wait(lock); + } + } + + void OnProgress(const ProgressData* data, size_t /* count */) override { + Napi::Env env = Env(); + if (!_progress.IsEmpty()) { + Number progress = Number::New(env, data->progress); + _progress.MakeCallback(Receiver().Value(), { progress }); + } + if (data->progress + 1 == _times) { + _cv.notify_one(); + } + } + +private: + TestWorker(Function cb, Function progress, const char* resource_name, const Object& resource) + : AsyncProgressQueueWorker(cb, resource_name, resource) { + _progress.Reset(progress, 1); + } + + std::condition_variable _cv; + std::mutex _cvm; + int32_t _times; + FunctionReference _progress; +}; + +} + +Object InitAsyncProgressQueueWorker(Env env) { + Object exports = Object::New(env); + exports["doWork"] = Function::New(env, TestWorker::DoWork); + return exports; +} + +#endif diff --git a/test/asyncprogressqueueworker.js b/test/asyncprogressqueueworker.js new file mode 100644 index 000000000..217b49e11 --- /dev/null +++ b/test/asyncprogressqueueworker.js @@ -0,0 +1,42 @@ +'use strict'; +const buildType = process.config.target_defaults.default_configuration; +const common = require('./common') +const assert = require('assert'); + +test(require(`./build/${buildType}/binding.node`)); +test(require(`./build/${buildType}/binding_noexcept.node`)); + +function test({ asyncprogressqueueworker }) { + success(asyncprogressqueueworker); + fail(asyncprogressqueueworker); + return; +} + +function success(binding) { + const expected = [0, 1, 2, 3]; + const actual = []; + binding.doWork(expected.length, + common.mustCall((err) => { + if (err) { + assert.fail(err); + } + }), + common.mustCall((_progress) => { + actual.push(_progress); + if (actual.length === expected.length) { + assert.deepEqual(actual, expected); + } + }, expected.length) + ); +} + +function fail(binding) { + binding.doWork(-1, + common.mustCall((err) => { + assert.throws(() => { throw err }, /test error/) + }), + () => { + assert.fail('unexpected progress report'); + } + ); +} diff --git a/test/binding.cc b/test/binding.cc index aa9db6e41..111bcce01 100644 --- a/test/binding.cc +++ b/test/binding.cc @@ -5,6 +5,7 @@ using namespace Napi; Object InitArrayBuffer(Env env); Object InitAsyncContext(Env env); #if (NAPI_VERSION > 3) +Object InitAsyncProgressQueueWorker(Env env); Object InitAsyncProgressWorker(Env env); #endif Object InitAsyncWorker(Env env); @@ -60,6 +61,7 @@ Object Init(Env env, Object exports) { exports.Set("arraybuffer", InitArrayBuffer(env)); exports.Set("asynccontext", InitAsyncContext(env)); #if (NAPI_VERSION > 3) + exports.Set("asyncprogressqueueworker", InitAsyncProgressQueueWorker(env)); exports.Set("asyncprogressworker", InitAsyncProgressWorker(env)); #endif exports.Set("asyncworker", InitAsyncWorker(env)); diff --git a/test/binding.gyp b/test/binding.gyp index b6777808d..2d6ac9549 100644 --- a/test/binding.gyp +++ b/test/binding.gyp @@ -4,6 +4,7 @@ 'sources': [ 'arraybuffer.cc', 'asynccontext.cc', + 'asyncprogressqueueworker.cc', 'asyncprogressworker.cc', 'asyncworker.cc', 'asyncworker-persistent.cc', diff --git a/test/index.js b/test/index.js index 1bd1d9144..e96ac5bf8 100644 --- a/test/index.js +++ b/test/index.js @@ -10,6 +10,7 @@ process.config.target_defaults.default_configuration = let testModules = [ 'arraybuffer', 'asynccontext', + 'asyncprogressqueueworker', 'asyncprogressworker', 'asyncworker', 'asyncworker-nocallback', @@ -72,6 +73,7 @@ if (napiVersion < 3) { } if (napiVersion < 4) { + testModules.splice(testModules.indexOf('asyncprogressqueueworker'), 1); testModules.splice(testModules.indexOf('asyncprogressworker'), 1); testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_ctx'), 1); testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_existing_tsfn'), 1); From c558b42c15eb88e1bd08da53f23c9802f21f147a Mon Sep 17 00:00:00 2001 From: legendecas Date: Sun, 2 Feb 2020 23:40:52 +0800 Subject: [PATCH 3/8] asyncworker: wait for tsfn completing --- napi-inl.h | 25 ++++++++++++++++++++----- napi.h | 6 ++++-- test/asyncprogressqueueworker.cc | 26 +++++++++++++------------- test/asyncprogressqueueworker.js | 5 ++--- 4 files changed, 39 insertions(+), 23 deletions(-) diff --git a/napi-inl.h b/napi-inl.h index 425864454..8ec445382 100644 --- a/napi-inl.h +++ b/napi-inl.h @@ -4597,7 +4597,7 @@ inline AsyncProgressWorkerBase::AsyncProgressWorkerBase(const Object& size_t queue_size) : AsyncWorker(receiver, callback, resource_name, resource) { // Fill all possible arguments to work around from ambiguous ThreadSafeFunction::New signatures. - _tsfn = ThreadSafeFunction::New(callback.Env(), callback, resource, resource_name, queue_size, 1, this, Finalizer, this); + _tsfn = ThreadSafeFunction::New(callback.Env(), callback, resource, resource_name, queue_size, 1, this, OnThreadSafeFunctionFinalize, this); } #if NAPI_VERSION > 4 @@ -4611,7 +4611,7 @@ inline AsyncProgressWorkerBase::AsyncProgressWorkerBase(Napi::Env env, // functions are no longer optional we can remove the dummy Function here. Function callback; // Fill all possible arguments to work around from ambiguous ThreadSafeFunction::New signatures. - _tsfn = ThreadSafeFunction::New(env, callback, resource, resource_name, queue_size, 1, this, Finalizer, this); + _tsfn = ThreadSafeFunction::New(env, callback, resource, resource_name, queue_size, 1, this, OnThreadSafeFunctionFinalize, this); } #endif @@ -4619,8 +4619,8 @@ template inline AsyncProgressWorkerBase::~AsyncProgressWorkerBase() { // Abort pending tsfn call. // Don't send progress events after we've already completed. - this->_tsfn.Abort(); - this->_tsfn.Release(); + // It's ok to call ThreadSafeFunction::Abort and ThreadSafeFunction::Release duplicated. + _tsfn.Abort(); } template @@ -4637,6 +4637,20 @@ inline void AsyncProgressWorkerBase::NonBlockingCall(DataType* data) { _tsfn.NonBlockingCall(tsd, OnAsyncWorkProgress); } +template +inline void AsyncProgressWorkerBase::OnWorkComplete(Napi::Env env, napi_status status) { + _work_completed = true; + _complete_status = status; + _tsfn.Release(); +} + +template +inline void AsyncProgressWorkerBase::OnThreadSafeFunctionFinalize(Napi::Env env, void* data, AsyncProgressWorkerBase* context) { + if (context->_work_completed) { + context->AsyncWorker::OnWorkComplete(env, context->_complete_status); + } +} + //////////////////////////////////////////////////////////////////////////////// // Async Progress Worker class //////////////////////////////////////////////////////////////////////////////// @@ -4876,7 +4890,8 @@ inline void AsyncProgressQueueWorker::Signal() const { template inline void AsyncProgressQueueWorker::OnWorkComplete(Napi::Env env, napi_status status) { - AsyncWorker::OnWorkComplete(env, status); + // Draining queued items in TSFN. + AsyncProgressWorkerBase>::OnWorkComplete(env, status); } template diff --git a/napi.h b/napi.h index 1f5bc8a61..fdb172d82 100644 --- a/napi.h +++ b/napi.h @@ -2265,6 +2265,7 @@ namespace Napi { AsyncProgressWorkerBase* _asyncprogressworker; DataType* _data; }; + void OnWorkComplete(Napi::Env env, napi_status status) override; protected: explicit AsyncProgressWorkerBase(const Object& receiver, const Function& callback, @@ -2286,12 +2287,13 @@ namespace Napi { Napi::Function jsCallback, void* data); - void NonBlockingCall(DataType* data); private: ThreadSafeFunction _tsfn; - static inline void Finalizer(Napi::Env env, void* data, AsyncProgressWorkerBase* context) {}; + bool _work_completed = false; + napi_status _complete_status; + static inline void OnThreadSafeFunctionFinalize(Napi::Env env, void* data, AsyncProgressWorkerBase* context); }; template diff --git a/test/asyncprogressqueueworker.cc b/test/asyncprogressqueueworker.cc index aa6f4d8d2..b8eb33004 100644 --- a/test/asyncprogressqueueworker.cc +++ b/test/asyncprogressqueueworker.cc @@ -17,7 +17,7 @@ struct ProgressData { class TestWorker : public AsyncProgressQueueWorker { public: - static void DoWork(const CallbackInfo& info) { + static Napi::Value DoWork(const CallbackInfo& info) { int32_t times = info[0].As().Int32Value(); Function cb = info[1].As(); Function progress = info[2].As(); @@ -25,6 +25,15 @@ class TestWorker : public AsyncProgressQueueWorker { TestWorker* worker = new TestWorker(cb, progress, "TestResource", Object::New(info.Env())); worker->_times = times; worker->Queue(); + + return Napi::External::New(info.Env(), worker); + } + + static Napi::Value CancelWork(const CallbackInfo& info) { + auto wrap = info[0].As>(); + auto worker = wrap.Data(); + worker->Cancel(); + return Napi::Boolean::New(info.Env(), true); } protected: @@ -37,11 +46,6 @@ class TestWorker : public AsyncProgressQueueWorker { data.progress = idx; progress.Send(&data, 1); } - // keep worker alive until we processed all progress. - if (_times > 0) { - std::unique_lock lock(_cvm); - _cv.wait(lock); - } } void OnProgress(const ProgressData* data, size_t /* count */) override { @@ -50,28 +54,24 @@ class TestWorker : public AsyncProgressQueueWorker { Number progress = Number::New(env, data->progress); _progress.MakeCallback(Receiver().Value(), { progress }); } - if (data->progress + 1 == _times) { - _cv.notify_one(); - } } private: TestWorker(Function cb, Function progress, const char* resource_name, const Object& resource) - : AsyncProgressQueueWorker(cb, resource_name, resource) { + : AsyncProgressQueueWorker(cb, resource_name, resource) { _progress.Reset(progress, 1); } - std::condition_variable _cv; - std::mutex _cvm; int32_t _times; FunctionReference _progress; }; -} +} // namespace Object InitAsyncProgressQueueWorker(Env env) { Object exports = Object::New(env); exports["doWork"] = Function::New(env, TestWorker::DoWork); + exports["cancelWork"] = Function::New(env, TestWorker::CancelWork); return exports; } diff --git a/test/asyncprogressqueueworker.js b/test/asyncprogressqueueworker.js index 217b49e11..5bc61d672 100644 --- a/test/asyncprogressqueueworker.js +++ b/test/asyncprogressqueueworker.js @@ -20,12 +20,11 @@ function success(binding) { if (err) { assert.fail(err); } + // All queued items shall be invoked before complete callback. + assert.deepEqual(actual, expected); }), common.mustCall((_progress) => { actual.push(_progress); - if (actual.length === expected.length) { - assert.deepEqual(actual, expected); - } }, expected.length) ); } From 95890afb928099eb7ed6d539c25ad631b531dc20 Mon Sep 17 00:00:00 2001 From: legendecas Date: Mon, 3 Feb 2020 00:31:48 +0800 Subject: [PATCH 4/8] test: cancel a queue worker --- test/asyncprogressqueueworker.cc | 15 ++++++++++++--- test/asyncprogressqueueworker.js | 19 +++++++++++++++++-- 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/test/asyncprogressqueueworker.cc b/test/asyncprogressqueueworker.cc index b8eb33004..849a861c5 100644 --- a/test/asyncprogressqueueworker.cc +++ b/test/asyncprogressqueueworker.cc @@ -17,21 +17,29 @@ struct ProgressData { class TestWorker : public AsyncProgressQueueWorker { public: - static Napi::Value DoWork(const CallbackInfo& info) { + static Napi::Value CreateWork(const CallbackInfo& info) { int32_t times = info[0].As().Int32Value(); Function cb = info[1].As(); Function progress = info[2].As(); TestWorker* worker = new TestWorker(cb, progress, "TestResource", Object::New(info.Env())); worker->_times = times; - worker->Queue(); return Napi::External::New(info.Env(), worker); } + static Napi::Value QueueWork(const CallbackInfo& info) { + auto wrap = info[0].As>(); + auto worker = wrap.Data(); + worker->Queue(); + return Napi::Boolean::New(info.Env(), true); + } + static Napi::Value CancelWork(const CallbackInfo& info) { auto wrap = info[0].As>(); auto worker = wrap.Data(); + // We cannot cancel a worker if it got started. So we have to do a quick cancel. + worker->Queue(); worker->Cancel(); return Napi::Boolean::New(info.Env(), true); } @@ -70,7 +78,8 @@ class TestWorker : public AsyncProgressQueueWorker { Object InitAsyncProgressQueueWorker(Env env) { Object exports = Object::New(env); - exports["doWork"] = Function::New(env, TestWorker::DoWork); + exports["createWork"] = Function::New(env, TestWorker::CreateWork); + exports["queueWork"] = Function::New(env, TestWorker::QueueWork); exports["cancelWork"] = Function::New(env, TestWorker::CancelWork); return exports; } diff --git a/test/asyncprogressqueueworker.js b/test/asyncprogressqueueworker.js index 5bc61d672..d21043f0f 100644 --- a/test/asyncprogressqueueworker.js +++ b/test/asyncprogressqueueworker.js @@ -9,13 +9,14 @@ test(require(`./build/${buildType}/binding_noexcept.node`)); function test({ asyncprogressqueueworker }) { success(asyncprogressqueueworker); fail(asyncprogressqueueworker); + cancel(asyncprogressqueueworker); return; } function success(binding) { const expected = [0, 1, 2, 3]; const actual = []; - binding.doWork(expected.length, + const worker = binding.createWork(expected.length, common.mustCall((err) => { if (err) { assert.fail(err); @@ -27,10 +28,11 @@ function success(binding) { actual.push(_progress); }, expected.length) ); + assert.strictEqual(binding.queueWork(worker), true); } function fail(binding) { - binding.doWork(-1, + const worker = binding.createWork(-1, common.mustCall((err) => { assert.throws(() => { throw err }, /test error/) }), @@ -38,4 +40,17 @@ function fail(binding) { assert.fail('unexpected progress report'); } ); + assert.strictEqual(binding.queueWork(worker), true); +} + +function cancel(binding) { + const worker = binding.createWork(-1, + () => { + assert.fail('unexpected callback'); + }, + () => { + assert.fail('unexpected progress report'); + } + ); + binding.cancelWork(worker); } From 0ced77153cc9f16b5b510be41fc7f9631d61046d Mon Sep 17 00:00:00 2001 From: legendecas Date: Mon, 3 Feb 2020 22:51:40 +0800 Subject: [PATCH 5/8] doc: async progress queue worker --- ...ess_worker.md => async_worker_variants.md} | 116 +++++++++++++++++- 1 file changed, 113 insertions(+), 3 deletions(-) rename doc/{async_progress_worker.md => async_worker_variants.md} (76%) diff --git a/doc/async_progress_worker.md b/doc/async_worker_variants.md similarity index 76% rename from doc/async_progress_worker.md rename to doc/async_worker_variants.md index 296b51b7d..4a9a83404 100644 --- a/doc/async_progress_worker.md +++ b/doc/async_worker_variants.md @@ -272,12 +272,12 @@ called and are executed as part of the event loop. The code below shows a basic example of the `Napi::AsyncProgressWorker` implementation: ```cpp -#include +#include #include #include -use namespace Napi; +using namespace Napi; class EchoWorker : public AsyncProgressWorker { public: @@ -323,7 +323,7 @@ The following code shows an example of how to create and use an `Napi::AsyncProg // Include EchoWorker class // .. -use namespace Napi; +using namespace Napi; Value Echo(const CallbackInfo& info) { // We need to validate the arguments here @@ -341,4 +341,114 @@ asynchronous task ends and other data needed for the computation. Once created, the only other action needed is to call the `Napi::AsyncProgressWorker::Queue` method that will queue the created worker for execution. +# AsyncProgressQueueWorker + +`Napi::AsyncProgressQueueWorker` acts exactly like `Napi::AsyncProgressWorker` +except that each progress commited by `Napi::AsyncProgressQueueWorker::ExecutionProgress::Send` +during `Napi::AsyncProgressQueueWorker::Execute` is guaranteed to be +processed by `Napi::AsyncProgressQueueWorker::OnProgress` on JavaScript thread +by committing order order. + +For the most basic use, only the `Napi::AsyncProgressQueueWorker::Execute` and +`Napi::AsyncProgressQueueWorker::OnProgress` method must be implemented in a subclass. + +# AsyncProgressQueueWorker::ExecutionProcess + +A bridge class created before the worker thread execution of `Napi::AsyncProgressQueueWorker::Execute`. + +## Methods + +### Send + +`Napi::AsyncProgressQueueWorker::ExecutionProcess::Send` takes two arguments, a pointer +to a generic type of data, and a `size_t` to indicate how many items the pointer is +pointing to. + +The data pointed to will be copied to internal slots of `Napi::AsyncProgressQueueWorker` so +after the call to `Napi::AsyncProgressQueueWorker::ExecutionProcess::Send` the data can +be safely released. + +`Napi::AsyncProgressQueueWorker::ExecutionProcess::Send` guarantees **eventual** +invocation of `Napi::AsyncProgressQueueWorker::OnProgress`, which means +multiple send will be cast to orderly invocation of `Napi::AsyncProgressQueueWorker::OnProgress` +with each data. + +```cpp +void Napi::AsyncProgressQueueWorker::ExecutionProcess::Send(const T* data, size_t count) const; +``` + +## Example + +The code below shows a basic example of the `Napi::AsyncProgressQueueWorker` implementation: + +```cpp +#include + +#include +#include + +using namespace Napi; + +class EchoWorker : public AsyncProgressQueueWorker { + public: + EchoWorker(Function& callback, std::string& echo) + : AsyncProgressQueueWorker(callback), echo(echo) {} + + ~EchoWorker() {} + // This code will be executed on the worker thread + void Execute(const ExecutionProgress& progress) { + // Need to simulate cpu heavy task + for (uint32_t i = 0; i < 100; ++i) { + progress.Send(&i, 1) + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + } + + void OnOK() { + HandleScope scope(Env()); + Callback().Call({Env().Null(), String::New(Env(), echo)}); + } + + void OnProgress(const uint32_t* data, size_t /* count */) { + HandleScope scope(Env()); + Callback().Call({Env().Null(), Env().Null(), Number::New(Env(), *data)}); + } + + private: + std::string echo; +}; +``` + +The `EchoWorker`'s constructor calls the base class' constructor to pass in the +callback that the `Napi::AsyncProgressQueueWorker` base class will store persistently. When +the work on the `Napi::AsyncProgressQueueWorker::Execute` method is done the +`Napi::AsyncProgressQueueWorker::OnOk` method is called and the results are return back to +JavaScript when the stored callback is invoked with its associated environment. + +The following code shows an example of how to create and use an `Napi::AsyncProgressQueueWorker` + +```cpp +#include + +// Include EchoWorker class +// .. + +using namespace Napi; + +Value Echo(const CallbackInfo& info) { + // We need to validate the arguments here + Function cb = info[1].As(); + std::string in = info[0].As(); + EchoWorker* wk = new EchoWorker(cb, in); + wk->Queue(); + return info.Env().Undefined(); +} +``` + +The implementation of a `Napi::AsyncProgressQueueWorker` can be used by creating a +new instance and passing to its constructor the callback to execute when the +asynchronous task ends and other data needed for the computation. Once created, +the only other action needed is to call the `Napi::AsyncProgressQueueWorker::Queue` +method that will queue the created worker for execution. + [`Napi::AsyncWorker`]: ./async_worker.md From baa03a56075a4fa309a167e32c3ab03f8c3d9cc7 Mon Sep 17 00:00:00 2001 From: legendecas Date: Mon, 3 Feb 2020 23:08:25 +0800 Subject: [PATCH 6/8] fix gcc complains about unused parameters --- napi-inl.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/napi-inl.h b/napi-inl.h index 8ec445382..bdd925d8b 100644 --- a/napi-inl.h +++ b/napi-inl.h @@ -4638,14 +4638,14 @@ inline void AsyncProgressWorkerBase::NonBlockingCall(DataType* data) { } template -inline void AsyncProgressWorkerBase::OnWorkComplete(Napi::Env env, napi_status status) { +inline void AsyncProgressWorkerBase::OnWorkComplete(Napi::Env /* env */, napi_status status) { _work_completed = true; _complete_status = status; _tsfn.Release(); } template -inline void AsyncProgressWorkerBase::OnThreadSafeFunctionFinalize(Napi::Env env, void* data, AsyncProgressWorkerBase* context) { +inline void AsyncProgressWorkerBase::OnThreadSafeFunctionFinalize(Napi::Env env, void* /* data */, AsyncProgressWorkerBase* context) { if (context->_work_completed) { context->AsyncWorker::OnWorkComplete(env, context->_complete_status); } From db62dfb09c1e2db49fbbdd94e9ae01ecb0c46aae Mon Sep 17 00:00:00 2001 From: legendecas Date: Mon, 3 Feb 2020 23:40:47 +0800 Subject: [PATCH 7/8] fix flaky async queue worker cancelling case --- doc/async_worker_variants.md | 10 +++++----- test/asyncprogressqueueworker.cc | 3 +++ test/asyncprogressqueueworker.js | 7 +++++++ 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/doc/async_worker_variants.md b/doc/async_worker_variants.md index 4a9a83404..356fb3de9 100644 --- a/doc/async_worker_variants.md +++ b/doc/async_worker_variants.md @@ -344,10 +344,10 @@ method that will queue the created worker for execution. # AsyncProgressQueueWorker `Napi::AsyncProgressQueueWorker` acts exactly like `Napi::AsyncProgressWorker` -except that each progress commited by `Napi::AsyncProgressQueueWorker::ExecutionProgress::Send` +except that each progress committed by `Napi::AsyncProgressQueueWorker::ExecutionProgress::Send` during `Napi::AsyncProgressQueueWorker::Execute` is guaranteed to be processed by `Napi::AsyncProgressQueueWorker::OnProgress` on JavaScript thread -by committing order order. +by committing order. For the most basic use, only the `Napi::AsyncProgressQueueWorker::Execute` and `Napi::AsyncProgressQueueWorker::OnProgress` method must be implemented in a subclass. @@ -368,9 +368,9 @@ The data pointed to will be copied to internal slots of `Napi::AsyncProgressQueu after the call to `Napi::AsyncProgressQueueWorker::ExecutionProcess::Send` the data can be safely released. -`Napi::AsyncProgressQueueWorker::ExecutionProcess::Send` guarantees **eventual** -invocation of `Napi::AsyncProgressQueueWorker::OnProgress`, which means -multiple send will be cast to orderly invocation of `Napi::AsyncProgressQueueWorker::OnProgress` +`Napi::AsyncProgressQueueWorker::ExecutionProcess::Send` guarantees invocation +of `Napi::AsyncProgressQueueWorker::OnProgress`, which means multiple send will +be cast to orderly invocation of `Napi::AsyncProgressQueueWorker::OnProgress` with each data. ```cpp diff --git a/test/asyncprogressqueueworker.cc b/test/asyncprogressqueueworker.cc index 849a861c5..ab439d95c 100644 --- a/test/asyncprogressqueueworker.cc +++ b/test/asyncprogressqueueworker.cc @@ -46,6 +46,9 @@ class TestWorker : public AsyncProgressQueueWorker { protected: void Execute(const ExecutionProgress& progress) override { + using namespace std::chrono_literals; + std::this_thread::sleep_for(1s); + if (_times < 0) { SetError("test error"); } diff --git a/test/asyncprogressqueueworker.js b/test/asyncprogressqueueworker.js index d21043f0f..a91112933 100644 --- a/test/asyncprogressqueueworker.js +++ b/test/asyncprogressqueueworker.js @@ -2,6 +2,7 @@ const buildType = process.config.target_defaults.default_configuration; const common = require('./common') const assert = require('assert'); +const os = require('os'); test(require(`./build/${buildType}/binding.node`)); test(require(`./build/${buildType}/binding_noexcept.node`)); @@ -44,6 +45,12 @@ function fail(binding) { } function cancel(binding) { + // make sure the work we are going to cancel will not be + // able to start by using all the threads in the pool. + for (let i = 0; i < os.cpus().length; ++i) { + const worker = binding.createWork(-1, () => {}, () => {}); + binding.queueWork(worker); + } const worker = binding.createWork(-1, () => { assert.fail('unexpected callback'); From a8e76c4e5fa7950632c2336895688d99636a0d83 Mon Sep 17 00:00:00 2001 From: legendecas Date: Fri, 28 Feb 2020 14:29:04 +0800 Subject: [PATCH 8/8] fixup chore issues --- doc/async_worker_variants.md | 24 +++++++++++++----------- napi-inl.h | 30 +++++++++++++++++++++++------- napi.h | 9 +-------- test/asyncprogressqueueworker.cc | 30 ++++++++++++++++++------------ test/asyncprogressqueueworker.js | 4 ++-- 5 files changed, 57 insertions(+), 40 deletions(-) diff --git a/doc/async_worker_variants.md b/doc/async_worker_variants.md index 356fb3de9..a1fb6787e 100644 --- a/doc/async_worker_variants.md +++ b/doc/async_worker_variants.md @@ -346,8 +346,8 @@ method that will queue the created worker for execution. `Napi::AsyncProgressQueueWorker` acts exactly like `Napi::AsyncProgressWorker` except that each progress committed by `Napi::AsyncProgressQueueWorker::ExecutionProgress::Send` during `Napi::AsyncProgressQueueWorker::Execute` is guaranteed to be -processed by `Napi::AsyncProgressQueueWorker::OnProgress` on JavaScript thread -by committing order. +processed by `Napi::AsyncProgressQueueWorker::OnProgress` on the JavaScript +thread in the order it was committed. For the most basic use, only the `Napi::AsyncProgressQueueWorker::Execute` and `Napi::AsyncProgressQueueWorker::OnProgress` method must be implemented in a subclass. @@ -369,9 +369,9 @@ after the call to `Napi::AsyncProgressQueueWorker::ExecutionProcess::Send` the d be safely released. `Napi::AsyncProgressQueueWorker::ExecutionProcess::Send` guarantees invocation -of `Napi::AsyncProgressQueueWorker::OnProgress`, which means multiple send will -be cast to orderly invocation of `Napi::AsyncProgressQueueWorker::OnProgress` -with each data. +of `Napi::AsyncProgressQueueWorker::OnProgress`, which means multiple `Send` +call will result in the in-order invocation of `Napi::AsyncProgressQueueWorker::OnProgress` +with each data item. ```cpp void Napi::AsyncProgressQueueWorker::ExecutionProcess::Send(const T* data, size_t count) const; @@ -420,12 +420,14 @@ class EchoWorker : public AsyncProgressQueueWorker { ``` The `EchoWorker`'s constructor calls the base class' constructor to pass in the -callback that the `Napi::AsyncProgressQueueWorker` base class will store persistently. When -the work on the `Napi::AsyncProgressQueueWorker::Execute` method is done the -`Napi::AsyncProgressQueueWorker::OnOk` method is called and the results are return back to -JavaScript when the stored callback is invoked with its associated environment. +callback that the `Napi::AsyncProgressQueueWorker` base class will store +persistently. When the work on the `Napi::AsyncProgressQueueWorker::Execute` +method is done the `Napi::AsyncProgressQueueWorker::OnOk` method is called and +the results are returned back to JavaScript when the stored callback is invoked +with its associated environment. -The following code shows an example of how to create and use an `Napi::AsyncProgressQueueWorker` +The following code shows an example of how to create and use an +`Napi::AsyncProgressQueueWorker`. ```cpp #include @@ -436,7 +438,7 @@ The following code shows an example of how to create and use an `Napi::AsyncProg using namespace Napi; Value Echo(const CallbackInfo& info) { - // We need to validate the arguments here + // We need to validate the arguments here. Function cb = info[1].As(); std::string in = info[0].As(); EchoWorker* wk = new EchoWorker(cb, in); diff --git a/napi-inl.h b/napi-inl.h index bdd925d8b..7d11e1b37 100644 --- a/napi-inl.h +++ b/napi-inl.h @@ -4596,8 +4596,16 @@ inline AsyncProgressWorkerBase::AsyncProgressWorkerBase(const Object& const Object& resource, size_t queue_size) : AsyncWorker(receiver, callback, resource_name, resource) { - // Fill all possible arguments to work around from ambiguous ThreadSafeFunction::New signatures. - _tsfn = ThreadSafeFunction::New(callback.Env(), callback, resource, resource_name, queue_size, 1, this, OnThreadSafeFunctionFinalize, this); + // Fill all possible arguments to work around ambiguous ThreadSafeFunction::New signatures. + _tsfn = ThreadSafeFunction::New(callback.Env(), + callback, + resource, + resource_name, + queue_size, + /** initialThreadCount */ 1, + /** context */ this, + OnThreadSafeFunctionFinalize, + /** finalizeData */ this); } #if NAPI_VERSION > 4 @@ -4608,10 +4616,18 @@ inline AsyncProgressWorkerBase::AsyncProgressWorkerBase(Napi::Env env, size_t queue_size) : AsyncWorker(env, resource_name, resource) { // TODO: Once the changes to make the callback optional for threadsafe - // functions are no longer optional we can remove the dummy Function here. + // functions are available on all versions we can remove the dummy Function here. Function callback; - // Fill all possible arguments to work around from ambiguous ThreadSafeFunction::New signatures. - _tsfn = ThreadSafeFunction::New(env, callback, resource, resource_name, queue_size, 1, this, OnThreadSafeFunctionFinalize, this); + // Fill all possible arguments to work around ambiguous ThreadSafeFunction::New signatures. + _tsfn = ThreadSafeFunction::New(env, + callback, + resource, + resource_name, + queue_size, + /** initialThreadCount */ 1, + /** context */ this, + OnThreadSafeFunctionFinalize, + /** finalizeData */ this); } #endif @@ -4632,9 +4648,9 @@ inline void AsyncProgressWorkerBase::OnAsyncWorkProgress(Napi::Env /* } template -inline void AsyncProgressWorkerBase::NonBlockingCall(DataType* data) { +inline napi_status AsyncProgressWorkerBase::NonBlockingCall(DataType* data) { auto tsd = new AsyncProgressWorkerBase::ThreadSafeData(this, data); - _tsfn.NonBlockingCall(tsd, OnAsyncWorkProgress); + return _tsfn.NonBlockingCall(tsd, OnAsyncWorkProgress); } template diff --git a/napi.h b/napi.h index fdb172d82..6d62b8ba3 100644 --- a/napi.h +++ b/napi.h @@ -6,7 +6,6 @@ #include #include #include -#include #include #include @@ -1967,12 +1966,6 @@ namespace Napi { napi_async_context _context; }; - - inline void OnAsyncWorkExecute(napi_env env, void* asyncworker); - inline void OnAsyncWorkComplete(napi_env env, - napi_status status, - void* asyncworker); - class AsyncWorker { public: virtual ~AsyncWorker(); @@ -2287,7 +2280,7 @@ namespace Napi { Napi::Function jsCallback, void* data); - void NonBlockingCall(DataType* data); + napi_status NonBlockingCall(DataType* data); private: ThreadSafeFunction _tsfn; diff --git a/test/asyncprogressqueueworker.cc b/test/asyncprogressqueueworker.cc index ab439d95c..b30863301 100644 --- a/test/asyncprogressqueueworker.cc +++ b/test/asyncprogressqueueworker.cc @@ -22,26 +22,27 @@ class TestWorker : public AsyncProgressQueueWorker { Function cb = info[1].As(); Function progress = info[2].As(); - TestWorker* worker = new TestWorker(cb, progress, "TestResource", Object::New(info.Env())); - worker->_times = times; + TestWorker* worker = new TestWorker(cb, + progress, + "TestResource", + Object::New(info.Env()), + times); return Napi::External::New(info.Env(), worker); } - static Napi::Value QueueWork(const CallbackInfo& info) { + static void QueueWork(const CallbackInfo& info) { auto wrap = info[0].As>(); auto worker = wrap.Data(); worker->Queue(); - return Napi::Boolean::New(info.Env(), true); } - static Napi::Value CancelWork(const CallbackInfo& info) { + static void CancelWork(const CallbackInfo& info) { auto wrap = info[0].As>(); auto worker = wrap.Data(); // We cannot cancel a worker if it got started. So we have to do a quick cancel. worker->Queue(); worker->Cancel(); - return Napi::Boolean::New(info.Env(), true); } protected: @@ -61,20 +62,25 @@ class TestWorker : public AsyncProgressQueueWorker { void OnProgress(const ProgressData* data, size_t /* count */) override { Napi::Env env = Env(); - if (!_progress.IsEmpty()) { + if (!_js_progress_cb.IsEmpty()) { Number progress = Number::New(env, data->progress); - _progress.MakeCallback(Receiver().Value(), { progress }); + _js_progress_cb.Call(Receiver().Value(), { progress }); } } private: - TestWorker(Function cb, Function progress, const char* resource_name, const Object& resource) - : AsyncProgressQueueWorker(cb, resource_name, resource) { - _progress.Reset(progress, 1); + TestWorker(Function cb, + Function progress, + const char* resource_name, + const Object& resource, + int32_t times) + : AsyncProgressQueueWorker(cb, resource_name, resource), + _times(times) { + _js_progress_cb.Reset(progress, 1); } int32_t _times; - FunctionReference _progress; + FunctionReference _js_progress_cb; }; } // namespace diff --git a/test/asyncprogressqueueworker.js b/test/asyncprogressqueueworker.js index a91112933..6fa65520e 100644 --- a/test/asyncprogressqueueworker.js +++ b/test/asyncprogressqueueworker.js @@ -29,7 +29,7 @@ function success(binding) { actual.push(_progress); }, expected.length) ); - assert.strictEqual(binding.queueWork(worker), true); + binding.queueWork(worker); } function fail(binding) { @@ -41,7 +41,7 @@ function fail(binding) { assert.fail('unexpected progress report'); } ); - assert.strictEqual(binding.queueWork(worker), true); + binding.queueWork(worker); } function cancel(binding) {