From 7b409c563507038102f204b80f30ec8202eeab05 Mon Sep 17 00:00:00 2001 From: Vincent Weevers Date: Sun, 12 Sep 2021 11:12:38 +0200 Subject: [PATCH 1/9] Close database on environment exit (cherry picked from commit Level/leveldown@8fdcaaa3bdadec3a2badf8bf13db28fbbfa9a93c) --- binding.cc | 53 ++++++++++++++++++++++++----- test/env-cleanup-hook-test.js | 33 ++++++++++++++++++ test/env-cleanup-hook.js | 59 +++++++++++++++++++++++++++++++++ test/iterator-recursion-test.js | 3 +- test/stack-blower.js | 7 +--- 5 files changed, 138 insertions(+), 17 deletions(-) create mode 100644 test/env-cleanup-hook-test.js create mode 100644 test/env-cleanup-hook.js diff --git a/binding.cc b/binding.cc index 67ac9a81..31c17dd0 100644 --- a/binding.cc +++ b/binding.cc @@ -481,15 +481,6 @@ struct Database { uint32_t priorityWork_; }; -/** - * Runs when a Database is garbage collected. - */ -static void FinalizeDatabase (napi_env env, void* data, void* hint) { - if (data) { - delete (Database*)data; - } -} - /** * Base worker class for doing async work that defers closing the database. */ @@ -718,11 +709,55 @@ struct Iterator { napi_ref ref_; }; +/** + * Hook for when the environment exits. This hook will be called after + * already-scheduled napi_async_work items have finished, which gives us + * the guarantee that no db operations will be in-flight at this time. + */ +static void env_cleanup_hook (void* arg) { + Database* database = (Database*)arg; + + // Do everything that db_close() does but synchronously. We're expecting that GC + // did not (yet) collect the database because that would be a user mistake (not + // closing their db) made during the lifetime of the environment. That's different + // from an environment being torn down (like the main process or a worker thread) + // where it's our responsibility to clean up. Note also, the following code must + // be a safe noop if called before db_open() or after db_close(). + if (database && database->db_ != NULL) { + std::map iterators = database->iterators_; + std::map::iterator it; + + for (it = iterators.begin(); it != iterators.end(); ++it) { + Iterator* iterator = it->second; + + if (!iterator->ended_) { + iterator->ended_ = true; + iterator->IteratorEnd(); + } + } + + // Having ended the iterators (and released snapshots) we can safely close. + database->CloseDatabase(); + } +} + +/** + * Runs when a Database is garbage collected. + */ +static void FinalizeDatabase (napi_env env, void* data, void* hint) { + if (data) { + Database* database = (Database*)data; + napi_remove_env_cleanup_hook(env, env_cleanup_hook, database); + delete database; + } +} + /** * Returns a context object for a database. */ NAPI_METHOD(db_init) { Database* database = new Database(env); + napi_add_env_cleanup_hook(env, env_cleanup_hook, database); napi_value result; NAPI_STATUS_THROWS(napi_create_external(env, database, diff --git a/test/env-cleanup-hook-test.js b/test/env-cleanup-hook-test.js new file mode 100644 index 00000000..cf20143d --- /dev/null +++ b/test/env-cleanup-hook-test.js @@ -0,0 +1,33 @@ +'use strict' + +const test = require('tape') +const fork = require('child_process').fork +const path = require('path') + +// Test env_cleanup_hook at several stages of a db lifetime +addTest(['create']) +addTest(['create', 'open']) +addTest(['create', 'open', 'create-iterator']) +addTest(['create', 'open', 'create-iterator', 'close']) +addTest(['create', 'open', 'create-iterator', 'nexting']) +addTest(['create', 'open', 'create-iterator', 'nexting', 'close']) +addTest(['create', 'open', 'close']) +addTest(['create', 'open-error']) + +function addTest (steps) { + test(`cleanup on environment exit (${steps.join(', ')})`, function (t) { + t.plan(3) + + const child = fork(path.join(__dirname, 'env-cleanup-hook.js'), steps) + + child.on('message', function (m) { + t.is(m, steps[steps.length - 1], `got to step: ${m}`) + child.disconnect() + }) + + child.on('exit', function (code, sig) { + t.is(code, 0, 'child exited normally') + t.is(sig, null, 'not terminated due to signal') + }) + }) +} diff --git a/test/env-cleanup-hook.js b/test/env-cleanup-hook.js new file mode 100644 index 00000000..2c20a633 --- /dev/null +++ b/test/env-cleanup-hook.js @@ -0,0 +1,59 @@ +'use strict' + +const testCommon = require('./common') + +function test (steps) { + let step + + function nextStep () { + step = steps.shift() || step + return step + } + + if (nextStep() !== 'create') { + // Send a message triggering an environment exit + // and indicating at which step we stopped. + return process.send(step) + } + + const db = testCommon.factory() + + if (nextStep() !== 'open') { + if (nextStep() === 'open-error') { + // If opening fails the cleanup hook should be a noop. + db.open({ createIfMissing: false, errorIfExists: true }, function (err) { + if (!err) throw new Error('Expected an open() error') + }) + } + + return process.send(step) + } + + // Open the db, expected to be closed by the cleanup hook. + db.open(function (err) { + if (err) throw err + + if (nextStep() === 'create-iterator') { + // Create an iterator, expected to be ended by the cleanup hook. + const it = db.iterator() + + if (nextStep() === 'nexting') { + // This async work should finish before the cleanup hook is called. + it.next(function (err) { + if (err) throw err + }) + } + } + + if (nextStep() === 'close') { + // Close the db, after which the cleanup hook is a noop. + db.close(function (err) { + if (err) throw err + }) + } + + process.send(step) + }) +} + +test(process.argv.slice(2)) diff --git a/test/iterator-recursion-test.js b/test/iterator-recursion-test.js index fbdc4293..89eb8e38 100644 --- a/test/iterator-recursion-test.js +++ b/test/iterator-recursion-test.js @@ -27,8 +27,7 @@ test('setUp common', testCommon.setUp) // call in our Iterator to segfault. This was fixed in 2014 (commit 85e6a38). // // Today (2020), we see occasional failures in CI again. We no longer call -// node::FatalException() so there's a new reason. Possibly related to -// https://github.com/Level/leveldown/issues/667. +// node::FatalException() so there's a new reason. test.skip('try to create an iterator with a blown stack', function (t) { for (let i = 0; i < 100; i++) { t.test(`try to create an iterator with a blown stack (${i})`, function (t) { diff --git a/test/stack-blower.js b/test/stack-blower.js index 390bcbb5..3fda3de4 100644 --- a/test/stack-blower.js +++ b/test/stack-blower.js @@ -21,12 +21,7 @@ if (process.argv[2] === 'run') { try { recurse() } catch (e) { - // Closing before process exit is normally not needed. This is a - // temporary workaround for Level/leveldown#667. - db.close(function (err) { - if (err) throw err - process.send('Catchable error at depth ' + depth) - }) + process.send('Catchable error at depth ' + depth) } }) } From 07316be03599799441826ba02f5a0bc9f527351b Mon Sep 17 00:00:00 2001 From: Vincent Weevers Date: Sat, 7 Dec 2019 11:39:39 +0200 Subject: [PATCH 2/9] Refactor: move CheckEndCallback to Iterator (cherry picked from commit Level/leveldown@d3453fbde4d2a8aa04d9091101c25c999649069b) --- binding.cc | 38 ++++++++++++++------------------------ 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/binding.cc b/binding.cc index 31c17dd0..1dd81561 100644 --- a/binding.cc +++ b/binding.cc @@ -31,7 +31,6 @@ class NullLogger : public rocksdb::Logger { */ struct Database; struct Iterator; -struct EndWorker; static void iterator_end_do (napi_env env, Iterator* iterator, napi_value cb); /** @@ -573,6 +572,15 @@ struct Iterator { database_->ReleaseSnapshot(options_->snapshot); } + void CheckEndCallback () { + nexting_ = false; + + if (endWorker_ != NULL) { + endWorker_->Queue(); + endWorker_ = NULL; + } + } + bool GetIterator () { if (dbIterator_ != NULL) return false; @@ -703,7 +711,7 @@ struct Iterator { bool ended_; leveldb::ReadOptions* options_; - EndWorker* endWorker_; + BaseWorker* endWorker_; private: napi_ref ref_; @@ -1449,30 +1457,16 @@ NAPI_METHOD(iterator_end) { NAPI_RETURN_UNDEFINED(); } -/** - * TODO Move this to Iterator. There isn't any reason - * for this function being a separate function pointer. - */ -void CheckEndCallback (Iterator* iterator) { - iterator->nexting_ = false; - if (iterator->endWorker_ != NULL) { - iterator->endWorker_->Queue(); - iterator->endWorker_ = NULL; - } -} - /** * Worker class for nexting an iterator. */ struct NextWorker final : public BaseWorker { NextWorker (napi_env env, Iterator* iterator, - napi_value callback, - void (*localCallback)(Iterator*)) + napi_value callback) : BaseWorker(env, iterator->database_, callback, "leveldown.iterator.next"), - iterator_(iterator), - localCallback_(localCallback) {} + iterator_(iterator) {} ~NextWorker () {} @@ -1513,8 +1507,7 @@ struct NextWorker final : public BaseWorker { } // clean up & handle the next/end state - // TODO this should just do iterator_->CheckEndCallback(); - localCallback_(iterator_); + iterator_->CheckEndCallback(); napi_value argv[3]; napi_get_null(env_, &argv[0]); @@ -1526,8 +1519,6 @@ struct NextWorker final : public BaseWorker { } Iterator* iterator_; - // TODO why do we need a function pointer for this? - void (*localCallback_)(Iterator*); std::vector > result_; bool ok_; }; @@ -1548,8 +1539,7 @@ NAPI_METHOD(iterator_next) { NAPI_RETURN_UNDEFINED(); } - NextWorker* worker = new NextWorker(env, iterator, callback, - CheckEndCallback); + NextWorker* worker = new NextWorker(env, iterator, callback); iterator->nexting_ = true; worker->Queue(); From e44d3a52d33f3b927f80cffcfd0bac70c1ab9b76 Mon Sep 17 00:00:00 2001 From: Vincent Weevers Date: Sun, 12 Sep 2021 21:23:55 +0200 Subject: [PATCH 3/9] Make `db.clear()` 27x faster by doing it natively Because this uses an iterator under the hood, it also refactors shared code between `db.iterator()` and `db.clear()`. (cherry picked from commit Level/leveldown@aedf49e3bce8bf99d0710809cb859d7243e429a2) --- binding.cc | 465 +++++++++++++++++++++++++++--------------- leveldown.js | 4 + test/iterator-test.js | 22 ++ 3 files changed, 321 insertions(+), 170 deletions(-) diff --git a/binding.cc b/binding.cc index 1dd81561..6a1badbf 100644 --- a/binding.cc +++ b/binding.cc @@ -170,7 +170,7 @@ static uint32_t Uint32Property (napi_env env, napi_value obj, const char* key, } /** - * Returns a uint32 property 'key' from 'obj'. + * Returns a int32 property 'key' from 'obj'. * Returns 'DEFAULT' if the property doesn't exist. */ static int Int32Property (napi_env env, napi_value obj, const char* key, @@ -305,11 +305,13 @@ struct BaseWorker { self->DoExecute(); } - void SetStatus (leveldb::Status status) { + bool SetStatus (leveldb::Status status) { status_ = status; if (!status.ok()) { SetErrorMessage(status.ToString().c_str()); + return false; } + return true; } void SetErrorMessage(const char *msg) { @@ -499,50 +501,36 @@ struct PriorityWorker : public BaseWorker { /** * Owns a leveldb iterator. */ -struct Iterator { - Iterator (Database* database, - uint32_t id, - bool reverse, - bool keys, - bool values, - int limit, - std::string* lt, - std::string* lte, - std::string* gt, - std::string* gte, - bool fillCache, - bool keyAsBuffer, - bool valueAsBuffer, - uint32_t highWaterMark) +struct BaseIterator { + BaseIterator(Database* database, + bool reverse, + std::string* lt, + std::string* lte, + std::string* gt, + std::string* gte, + int limit, + bool fillCache) : database_(database), - id_(id), + isEnding_(false), + hasEnded_(false), + didSeek_(false), reverse_(reverse), - keys_(keys), - values_(values), - limit_(limit), lt_(lt), lte_(lte), gt_(gt), gte_(gte), - keyAsBuffer_(keyAsBuffer), - valueAsBuffer_(valueAsBuffer), - highWaterMark_(highWaterMark), - dbIterator_(NULL), + limit_(limit), count_(0), - seeking_(false), - landed_(false), - nexting_(false), - ended_(false), - endWorker_(NULL), - ref_(NULL) { + eof_(false) { options_ = new leveldb::ReadOptions(); options_->fill_cache = fillCache; options_->verify_checksums = false; options_->snapshot = database->NewSnapshot(); + dbIterator_ = database_->NewIterator(options_); } - ~Iterator () { - assert(ended_); + ~BaseIterator () { + assert(hasEnded_); if (lt_ != NULL) delete lt_; if (gt_ != NULL) delete gt_; @@ -552,39 +540,15 @@ struct Iterator { delete options_; } - void Attach (napi_ref ref) { - ref_ = ref; - database_->AttachIterator(id_, this); + bool DidSeek () { + return didSeek_; } - napi_ref Detach () { - database_->DetachIterator(id_); - return ref_; - } - - leveldb::Status IteratorStatus () { - return dbIterator_->status(); - } - - void IteratorEnd () { - delete dbIterator_; - dbIterator_ = NULL; - database_->ReleaseSnapshot(options_->snapshot); - } - - void CheckEndCallback () { - nexting_ = false; - - if (endWorker_ != NULL) { - endWorker_->Queue(); - endWorker_ = NULL; - } - } - - bool GetIterator () { - if (dbIterator_ != NULL) return false; - - dbIterator_ = database_->NewIterator(options_); + /** + * Seek to the first relevant key based on range options. + */ + void SeekToRange () { + didSeek_ = true; if (!reverse_ && gte_ != NULL) { dbIterator_->Seek(*gte_); @@ -615,44 +579,91 @@ struct Iterator { } else { dbIterator_->SeekToFirst(); } - - return true; } - bool Read (std::string& key, std::string& value) { - if (!GetIterator() && !seeking_) { + /** + * Seek manually (during iteration). + */ + void Seek (leveldb::Slice& target) { + didSeek_ = true; + + if (OutOfRange(target)) { if (reverse_) { + dbIterator_->SeekToFirst(); dbIterator_->Prev(); - } - else { + } else { + dbIterator_->SeekToLast(); dbIterator_->Next(); } + + return; } - seeking_ = false; + dbIterator_->Seek(target); if (dbIterator_->Valid()) { - std::string keyStr = dbIterator_->key().ToString(); - - if ((limit_ < 0 || ++count_ <= limit_) - && ( lt_ != NULL ? (lt_->compare(keyStr) > 0) - : lte_ != NULL ? (lte_->compare(keyStr) >= 0) - : true ) - && ( gt_ != NULL ? (gt_->compare(keyStr) < 0) - : gte_ != NULL ? (gte_->compare(keyStr) <= 0) - : true ) - ) { - if (keys_) { - key.assign(dbIterator_->key().data(), dbIterator_->key().size()); - } - if (values_) { - value.assign(dbIterator_->value().data(), dbIterator_->value().size()); + int cmp = dbIterator_->key().compare(target); + if (cmp > 0 && reverse_) { + dbIterator_->Prev(); + } else if (cmp < 0 && !reverse_) { + dbIterator_->Next(); + } + } else { + if (reverse_) { + dbIterator_->SeekToLast(); + } else { + dbIterator_->SeekToFirst(); + } + if (dbIterator_->Valid()) { + int cmp = dbIterator_->key().compare(target); + if (cmp > 0 && reverse_) { + dbIterator_->SeekToFirst(); + dbIterator_->Prev(); + } else if (cmp < 0 && !reverse_) { + dbIterator_->SeekToLast(); + dbIterator_->Next(); } - return true; } } + } - return false; + void End () { + if (!hasEnded_) { + hasEnded_ = true; + delete dbIterator_; + dbIterator_ = NULL; + database_->ReleaseSnapshot(options_->snapshot); + } + } + + bool ReadOne () { + if (eof_ || !dbIterator_->Valid()) { + return false; + } + + if ((limit_ >= 0 && ++count_ > limit_) || OutOfRange(dbIterator_->key())) { + eof_ = true; + return false; + } + + return true; + } + + void Advance () { + if (reverse_) dbIterator_->Prev(); + else dbIterator_->Next(); + } + + leveldb::Slice CurrentKey () { + return dbIterator_->key(); + } + + leveldb::Slice CurrentValue () { + return dbIterator_->value(); + } + + leveldb::Status Status () { + return dbIterator_->status(); } bool OutOfRange (leveldb::Slice& target) { @@ -662,55 +673,119 @@ struct Iterator { (gte_ != NULL && target.compare(*gte_) < 0)); } - bool IteratorNext (std::vector >& result) { - size_t size = 0; - uint32_t cacheSize = 0; + Database* database_; + bool isEnding_; + bool hasEnded_; - while (true) { +private: + leveldb::Iterator* dbIterator_; + bool didSeek_; + bool reverse_; + std::string* lt_; + std::string* lte_; + std::string* gt_; + std::string* gte_; + int limit_; + int count_; + bool eof_; + leveldb::ReadOptions* options_; +}; + +/** + * Extends BaseIterator for reading it from JS land. + */ +struct Iterator final : public BaseIterator { + Iterator (Database* database, + uint32_t id, + bool reverse, + bool keys, + bool values, + int limit, + std::string* lt, + std::string* lte, + std::string* gt, + std::string* gte, + bool fillCache, + bool keyAsBuffer, + bool valueAsBuffer, + uint32_t highWaterMark) + : BaseIterator(database, reverse, lt, lte, gt, gte, limit, fillCache), + id_(id), + keys_(keys), + values_(values), + keyAsBuffer_(keyAsBuffer), + valueAsBuffer_(valueAsBuffer), + highWaterMark_(highWaterMark), + landed_(false), + nexting_(false), + endWorker_(NULL), + ref_(NULL) { + } + + ~Iterator () {} + + void Attach (napi_ref ref) { + ref_ = ref; + database_->AttachIterator(id_, this); + } + + napi_ref Detach () { + database_->DetachIterator(id_); + return ref_; + } + + void CheckEndCallback () { + nexting_ = false; + + if (endWorker_ != NULL) { + endWorker_->Queue(); + endWorker_ = NULL; + } + } + + bool ReadMany (uint32_t size, std::vector>& result) { + size_t bytesRead = 0; + + while (ReadOne()) { std::string key, value; - bool ok = Read(key, value); - if (ok) { - result.push_back(std::make_pair(key, value)); + if (keys_) { + leveldb::Slice slice = CurrentKey(); + key.assign(slice.data(), slice.size()); + bytesRead += key.size(); + } - if (!landed_) { - landed_ = true; - return true; - } + if (values_) { + leveldb::Slice slice = CurrentValue(); + value.assign(slice.data(), slice.size()); + bytesRead += value.size(); + } - size = size + key.size() + value.size(); - if (size > highWaterMark_) return true; + Advance(); + result.push_back(std::make_pair(key, value)); - // Limit the size of the cache to prevent starving the event loop - // in JS-land while we're recursively calling process.nextTick(). - if (++cacheSize >= 1000) return true; - } else { - return false; + if (!landed_) { + landed_ = true; + return true; + } + + if (bytesRead > highWaterMark_ || result.size() >= size) { + return true; } } + + return false; } - Database* database_; uint32_t id_; - bool reverse_; bool keys_; bool values_; - int limit_; - std::string* lt_; - std::string* lte_; - std::string* gt_; - std::string* gte_; bool keyAsBuffer_; bool valueAsBuffer_; uint32_t highWaterMark_; - leveldb::Iterator* dbIterator_; - int count_; - bool seeking_; bool landed_; bool nexting_; - bool ended_; - leveldb::ReadOptions* options_; BaseWorker* endWorker_; private: @@ -736,12 +811,7 @@ static void env_cleanup_hook (void* arg) { std::map::iterator it; for (it = iterators.begin(); it != iterators.end(); ++it) { - Iterator* iterator = it->second; - - if (!iterator->ended_) { - iterator->ended_ = true; - iterator->IteratorEnd(); - } + it->second->End(); } // Having ended the iterators (and released snapshots) we can safely close. @@ -1095,6 +1165,91 @@ NAPI_METHOD(db_del) { NAPI_RETURN_UNDEFINED(); } +/** + * Worker class for deleting a range from a database. + */ +struct ClearWorker final : public PriorityWorker { + ClearWorker (napi_env env, + Database* database, + napi_value callback, + bool reverse, + int limit, + std::string* lt, + std::string* lte, + std::string* gt, + std::string* gte) + : PriorityWorker(env, database, callback, "leveldown.db.clear") { + baseIterator_ = new BaseIterator(database, reverse, lt, lte, gt, gte, limit, false); + writeOptions_ = new leveldb::WriteOptions(); + writeOptions_->sync = false; + } + + ~ClearWorker () { + // TODO: write GC tests + delete baseIterator_; + delete writeOptions_; + } + + void DoExecute () override { + baseIterator_->SeekToRange(); + + // TODO: add option + uint32_t hwm = 16 * 1024; + leveldb::WriteBatch batch; + + while (true) { + size_t bytesRead = 0; + + while (bytesRead < hwm && baseIterator_->ReadOne()) { + leveldb::Slice key = baseIterator_->CurrentKey(); + batch.Delete(key); + bytesRead += key.size(); + baseIterator_->Advance(); + } + + if (!SetStatus(baseIterator_->Status()) || bytesRead == 0) { + break; + } + + if (!SetStatus(database_->WriteBatch(*writeOptions_, &batch))) { + break; + } + + batch.Clear(); + } + + baseIterator_->End(); + } + +private: + BaseIterator* baseIterator_; + leveldb::WriteOptions* writeOptions_; +}; + +/** + * Delete a range from a database. + */ +NAPI_METHOD(db_clear) { + NAPI_ARGV(3); + NAPI_DB_CONTEXT(); + + napi_value options = argv[1]; + napi_value callback = argv[2]; + + bool reverse = BooleanProperty(env, options, "reverse", false); + int limit = Int32Property(env, options, "limit", -1); + + std::string* lt = RangeOption(env, options, "lt"); + std::string* lte = RangeOption(env, options, "lte"); + std::string* gt = RangeOption(env, options, "gt"); + std::string* gte = RangeOption(env, options, "gte"); + + ClearWorker* worker = new ClearWorker(env, database, callback, reverse, limit, lt, lte, gt, gte); + worker->Queue(); + + NAPI_RETURN_UNDEFINED(); +} + /** * Worker class for calculating the size of a range. */ @@ -1354,51 +1509,13 @@ NAPI_METHOD(iterator_seek) { NAPI_ARGV(2); NAPI_ITERATOR_CONTEXT(); - if (iterator->ended_) { + if (iterator->isEnding_ || iterator->hasEnded_) { napi_throw_error(env, NULL, "iterator has ended"); } leveldb::Slice target = ToSlice(env, argv[1]); - iterator->GetIterator(); - - leveldb::Iterator* dbIterator = iterator->dbIterator_; - dbIterator->Seek(target); - - iterator->seeking_ = true; iterator->landed_ = false; - - if (iterator->OutOfRange(target)) { - if (iterator->reverse_) { - dbIterator->SeekToFirst(); - dbIterator->Prev(); - } else { - dbIterator->SeekToLast(); - dbIterator->Next(); - } - } else if (dbIterator->Valid()) { - int cmp = dbIterator->key().compare(target); - if (cmp > 0 && iterator->reverse_) { - dbIterator->Prev(); - } else if (cmp < 0 && !iterator->reverse_) { - dbIterator->Next(); - } - } else { - if (iterator->reverse_) { - dbIterator->SeekToLast(); - } else { - dbIterator->SeekToFirst(); - } - if (dbIterator->Valid()) { - int cmp = dbIterator->key().compare(target); - if (cmp > 0 && iterator->reverse_) { - dbIterator->SeekToFirst(); - dbIterator->Prev(); - } else if (cmp < 0 && !iterator->reverse_) { - dbIterator->SeekToLast(); - dbIterator->Next(); - } - } - } + iterator->Seek(target); DisposeSliceBuffer(target); NAPI_RETURN_UNDEFINED(); @@ -1417,7 +1534,7 @@ struct EndWorker final : public BaseWorker { ~EndWorker () {} void DoExecute () override { - iterator_->IteratorEnd(); + iterator_->End(); } void HandleOKCallback () override { @@ -1433,9 +1550,9 @@ struct EndWorker final : public BaseWorker { * open iterators during NAPI_METHOD(db_close). */ static void iterator_end_do (napi_env env, Iterator* iterator, napi_value cb) { - if (!iterator->ended_) { + if (!iterator->isEnding_ && !iterator->hasEnded_) { EndWorker* worker = new EndWorker(env, iterator, cb); - iterator->ended_ = true; + iterator->isEnding_ = true; if (iterator->nexting_) { iterator->endWorker_ = worker; @@ -1471,9 +1588,16 @@ struct NextWorker final : public BaseWorker { ~NextWorker () {} void DoExecute () override { - ok_ = iterator_->IteratorNext(result_); + if (!iterator_->DidSeek()) { + iterator_->SeekToRange(); + } + + // Limit the size of the cache to prevent starving the event loop + // in JS-land while we're recursively calling process.nextTick(). + ok_ = iterator_->ReadMany(1000, result_); + if (!ok_) { - SetStatus(iterator_->IteratorStatus()); + SetStatus(iterator_->Status()); } } @@ -1532,7 +1656,7 @@ NAPI_METHOD(iterator_next) { napi_value callback = argv[1]; - if (iterator->ended_) { + if (iterator->isEnding_ || iterator->hasEnded_) { napi_value argv = CreateError(env, "iterator has ended"); CallFunction(env, callback, 1, &argv); @@ -1796,6 +1920,7 @@ NAPI_INIT() { NAPI_EXPORT_FUNCTION(db_put); NAPI_EXPORT_FUNCTION(db_get); NAPI_EXPORT_FUNCTION(db_del); + NAPI_EXPORT_FUNCTION(db_clear); NAPI_EXPORT_FUNCTION(db_approximate_size); NAPI_EXPORT_FUNCTION(db_compact_range); NAPI_EXPORT_FUNCTION(db_get_property); diff --git a/leveldown.js b/leveldown.js index 67f993e5..226a518c 100644 --- a/leveldown.js +++ b/leveldown.js @@ -63,6 +63,10 @@ LevelDOWN.prototype._del = function (key, options, callback) { binding.db_del(this.context, key, options, callback) } +LevelDOWN.prototype._clear = function (options, callback) { + binding.db_clear(this.context, options, callback) +} + LevelDOWN.prototype._chainedBatch = function () { return new ChainedBatch(this) } diff --git a/test/iterator-test.js b/test/iterator-test.js index 760e7360..413430a1 100644 --- a/test/iterator-test.js +++ b/test/iterator-test.js @@ -88,3 +88,25 @@ make('close db with open iterator', function (db, t, done) { done(null, false) }) }) + +make('key-only iterator', function (db, t, done) { + const it = db.iterator({ values: false, keyAsBuffer: false, valueAsBuffer: false }) + + it.next(function (err, key, value) { + t.ifError(err, 'no next() error') + t.is(key, 'one') + t.is(value, '') // should this be undefined? + it.end(done) + }) +}) + +make('value-only iterator', function (db, t, done) { + const it = db.iterator({ keys: false, keyAsBuffer: false, valueAsBuffer: false }) + + it.next(function (err, key, value) { + t.ifError(err, 'no next() error') + t.is(key, '') // should this be undefined? + t.is(value, '1') + it.end(done) + }) +}) From c79f5ab7623a46d24b4590fcab8f9f8ff82f0511 Mon Sep 17 00:00:00 2001 From: Vincent Weevers Date: Sun, 19 Sep 2021 01:26:19 +0200 Subject: [PATCH 4/9] Prevent GC of db during `clear()` and other operations (cherry picked from commit Level/leveldown@9a3f59aede9a6356efc13c8e8b31f6592b7aa64b) --- .github/workflows/test.yml | 2 ++ binding.cc | 19 ++++++++++++--- test/clear-gc-test.js | 47 ++++++++++++++++++++++++++++++++++++++ test/gc.js | 1 + 4 files changed, 66 insertions(+), 3 deletions(-) create mode 100644 test/clear-gc-test.js diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a647deb8..0d6c9910 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -36,3 +36,5 @@ jobs: uses: GabrielBB/xvfb-action@v1 with: run: npm run test-electron + - name: Test GC + run: npm run test-gc diff --git a/binding.cc b/binding.cc index 6a1badbf..4777609a 100644 --- a/binding.cc +++ b/binding.cc @@ -373,9 +373,14 @@ struct Database { db_(NULL), currentIteratorId_(0), pendingCloseWorker_(NULL), + ref_(NULL), priorityWork_(0) {} ~Database () { + if (ref_ != NULL) { + napi_delete_reference(env_, ref_); + } + if (db_ != NULL) { delete db_; db_ = NULL; @@ -458,11 +463,13 @@ struct Database { } void IncrementPriorityWork () { - ++priorityWork_; + napi_reference_ref(env_, ref_, &priorityWork_); } void DecrementPriorityWork () { - if (--priorityWork_ == 0 && pendingCloseWorker_ != NULL) { + napi_reference_unref(env_, ref_, &priorityWork_); + + if (priorityWork_ == 0 && pendingCloseWorker_ != NULL) { pendingCloseWorker_->Queue(); pendingCloseWorker_ = NULL; } @@ -477,6 +484,7 @@ struct Database { uint32_t currentIteratorId_; BaseWorker *pendingCloseWorker_; std::map< uint32_t, Iterator * > iterators_; + napi_ref ref_; private: uint32_t priorityWork_; @@ -841,11 +849,16 @@ NAPI_METHOD(db_init) { NAPI_STATUS_THROWS(napi_create_external(env, database, FinalizeDatabase, NULL, &result)); + + // Reference counter to prevent GC of database while priority workers are active + NAPI_STATUS_THROWS(napi_create_reference(env, result, 0, &database->ref_)); + return result; } /** * Worker class for opening a database. + * TODO: shouldn't this be a PriorityWorker? */ struct OpenWorker final : public BaseWorker { OpenWorker (napi_env env, @@ -1185,7 +1198,6 @@ struct ClearWorker final : public PriorityWorker { } ~ClearWorker () { - // TODO: write GC tests delete baseIterator_; delete writeOptions_; } @@ -1538,6 +1550,7 @@ struct EndWorker final : public BaseWorker { } void HandleOKCallback () override { + // TODO: if we don't use EndWorker, do we still delete the reference? napi_delete_reference(env_, iterator_->Detach()); BaseWorker::HandleOKCallback(); } diff --git a/test/clear-gc-test.js b/test/clear-gc-test.js new file mode 100644 index 00000000..2794878a --- /dev/null +++ b/test/clear-gc-test.js @@ -0,0 +1,47 @@ +'use strict' + +const test = require('tape') +const testCommon = require('./common') +const sourceData = [] + +for (let i = 0; i < 1e3; i++) { + sourceData.push({ + type: 'put', + key: i.toString(), + value: Math.random().toString() + }) +} + +test('db without ref does not get GCed while clear() is in progress', function (t) { + t.plan(4) + + let db = testCommon.factory() + + db.open(function (err) { + t.ifError(err, 'no open error') + + // Insert test data + db.batch(sourceData.slice(), function (err) { + t.ifError(err, 'no batch error') + + // Start async work + db.clear(function () { + t.pass('got callback') + + // Give GC another chance to run, to rule out other issues. + setImmediate(function () { + if (global.gc) global.gc() + t.pass() + }) + }) + + // Remove reference. The db should not get garbage collected + // until after the clear() callback, thanks to a napi_ref. + db = null + + // Useful for manual testing with "node --expose-gc". + // The pending tap assertion may also allow GC to kick in. + if (global.gc) global.gc() + }) + }) +}) diff --git a/test/gc.js b/test/gc.js index a5655f91..956cc2ad 100644 --- a/test/gc.js +++ b/test/gc.js @@ -10,3 +10,4 @@ if (!global.gc) { require('./cleanup-hanging-iterators-test') require('./iterator-gc-test') require('./chained-batch-gc-test') +require('./clear-gc-test') From ea19d08ca626db4625fe16ec60188ce238f2a850 Mon Sep 17 00:00:00 2001 From: Vincent Weevers Date: Mon, 20 Sep 2021 10:35:33 +0200 Subject: [PATCH 5/9] Refactor: avoid storing `napi_env` (cherry picked from commit Level/leveldown@0f88586656e10a140bb7953e2469cb68cd9a9fba) --- binding.cc | 219 +++++++++++++++++++++++++++-------------------------- 1 file changed, 111 insertions(+), 108 deletions(-) diff --git a/binding.cc b/binding.cc index 4777609a..a5ae461d 100644 --- a/binding.cc +++ b/binding.cc @@ -277,17 +277,18 @@ static napi_status CallFunction (napi_env env, * - DoFinally (main thread): do cleanup regardless of success */ struct BaseWorker { + // Note: storing env is discouraged as we'd end up using it in unsafe places. BaseWorker (napi_env env, Database* database, napi_value callback, const char* resourceName) - : env_(env), database_(database), errMsg_(NULL) { - NAPI_STATUS_THROWS_VOID(napi_create_reference(env_, callback, 1, &callbackRef_)); + : database_(database), errMsg_(NULL) { + NAPI_STATUS_THROWS_VOID(napi_create_reference(env, callback, 1, &callbackRef_)); napi_value asyncResourceName; - NAPI_STATUS_THROWS_VOID(napi_create_string_utf8(env_, resourceName, + NAPI_STATUS_THROWS_VOID(napi_create_string_utf8(env, resourceName, NAPI_AUTO_LENGTH, &asyncResourceName)); - NAPI_STATUS_THROWS_VOID(napi_create_async_work(env_, callback, + NAPI_STATUS_THROWS_VOID(napi_create_async_work(env, callback, asyncResourceName, BaseWorker::Execute, BaseWorker::Complete, @@ -296,12 +297,13 @@ struct BaseWorker { virtual ~BaseWorker () { delete [] errMsg_; - napi_delete_reference(env_, callbackRef_); - napi_delete_async_work(env_, asyncWork_); } static void Execute (napi_env env, void* data) { BaseWorker* self = (BaseWorker*)data; + + // Don't pass env to DoExecute() because use of Node-API + // methods should generally be avoided in async work. self->DoExecute(); } @@ -322,39 +324,43 @@ struct BaseWorker { } virtual void DoExecute () = 0; - virtual void DoFinally () {}; + virtual void DoFinally (napi_env env) {}; static void Complete (napi_env env, napi_status status, void* data) { BaseWorker* self = (BaseWorker*)data; - self->DoComplete(); - self->DoFinally(); + + self->DoComplete(env); + self->DoFinally(env); + + napi_delete_reference(env, self->callbackRef_); + napi_delete_async_work(env, self->asyncWork_); + delete self; } - void DoComplete () { + void DoComplete (napi_env env) { if (status_.ok()) { - return HandleOKCallback(); + return HandleOKCallback(env); } - napi_value argv = CreateError(env_, errMsg_); + napi_value argv = CreateError(env, errMsg_); napi_value callback; - napi_get_reference_value(env_, callbackRef_, &callback); - CallFunction(env_, callback, 1, &argv); + napi_get_reference_value(env, callbackRef_, &callback); + CallFunction(env, callback, 1, &argv); } - virtual void HandleOKCallback () { + virtual void HandleOKCallback (napi_env env) { napi_value argv; - napi_get_null(env_, &argv); + napi_get_null(env, &argv); napi_value callback; - napi_get_reference_value(env_, callbackRef_, &callback); - CallFunction(env_, callback, 1, &argv); + napi_get_reference_value(env, callbackRef_, &callback); + CallFunction(env, callback, 1, &argv); } - void Queue () { - napi_queue_async_work(env_, asyncWork_); + void Queue (napi_env env) { + napi_queue_async_work(env, asyncWork_); } - napi_env env_; napi_ref callbackRef_; napi_async_work asyncWork_; Database* database_; @@ -368,19 +374,14 @@ struct BaseWorker { * Owns the LevelDB storage, cache, filter policy and iterators. */ struct Database { - Database (napi_env env) - : env_(env), - db_(NULL), + Database () + : db_(NULL), currentIteratorId_(0), pendingCloseWorker_(NULL), ref_(NULL), priorityWork_(0) {} ~Database () { - if (ref_ != NULL) { - napi_delete_reference(env_, ref_); - } - if (db_ != NULL) { delete db_; db_ = NULL; @@ -452,25 +453,25 @@ struct Database { return db_->ReleaseSnapshot(snapshot); } - void AttachIterator (uint32_t id, Iterator* iterator) { + void AttachIterator (napi_env env, uint32_t id, Iterator* iterator) { iterators_[id] = iterator; - IncrementPriorityWork(); + IncrementPriorityWork(env); } - void DetachIterator (uint32_t id) { + void DetachIterator (napi_env env, uint32_t id) { iterators_.erase(id); - DecrementPriorityWork(); + DecrementPriorityWork(env); } - void IncrementPriorityWork () { - napi_reference_ref(env_, ref_, &priorityWork_); + void IncrementPriorityWork (napi_env env) { + napi_reference_ref(env, ref_, &priorityWork_); } - void DecrementPriorityWork () { - napi_reference_unref(env_, ref_, &priorityWork_); + void DecrementPriorityWork (napi_env env) { + napi_reference_unref(env, ref_, &priorityWork_); if (priorityWork_ == 0 && pendingCloseWorker_ != NULL) { - pendingCloseWorker_->Queue(); + pendingCloseWorker_->Queue(env); pendingCloseWorker_ = NULL; } } @@ -479,7 +480,6 @@ struct Database { return priorityWork_ > 0; } - napi_env env_; leveldb::DB* db_; uint32_t currentIteratorId_; BaseWorker *pendingCloseWorker_; @@ -496,13 +496,13 @@ struct Database { struct PriorityWorker : public BaseWorker { PriorityWorker (napi_env env, Database* database, napi_value callback, const char* resourceName) : BaseWorker(env, database, callback, resourceName) { - database_->IncrementPriorityWork(); + database_->IncrementPriorityWork(env); } - ~PriorityWorker () {} + virtual ~PriorityWorker () {} - void DoFinally () override { - database_->DecrementPriorityWork(); + void DoFinally (napi_env env) override { + database_->DecrementPriorityWork(env); } }; @@ -537,7 +537,7 @@ struct BaseIterator { dbIterator_ = database_->NewIterator(options_); } - ~BaseIterator () { + virtual ~BaseIterator () { assert(hasEnded_); if (lt_ != NULL) delete lt_; @@ -548,7 +548,7 @@ struct BaseIterator { delete options_; } - bool DidSeek () { + bool DidSeek () const { return didSeek_; } @@ -662,19 +662,19 @@ struct BaseIterator { else dbIterator_->Next(); } - leveldb::Slice CurrentKey () { + leveldb::Slice CurrentKey () const { return dbIterator_->key(); } - leveldb::Slice CurrentValue () { + leveldb::Slice CurrentValue () const { return dbIterator_->value(); } - leveldb::Status Status () { + leveldb::Status Status () const { return dbIterator_->status(); } - bool OutOfRange (leveldb::Slice& target) { + bool OutOfRange (const leveldb::Slice& target) { return ((lt_ != NULL && target.compare(*lt_) >= 0) || (lte_ != NULL && target.compare(*lte_) > 0) || (gt_ != NULL && target.compare(*gt_) <= 0) || @@ -732,21 +732,21 @@ struct Iterator final : public BaseIterator { ~Iterator () {} - void Attach (napi_ref ref) { - ref_ = ref; - database_->AttachIterator(id_, this); + void Attach (napi_env env, napi_value context) { + napi_create_reference(env, context, 1, &ref_); + database_->AttachIterator(env, id_, this); } - napi_ref Detach () { - database_->DetachIterator(id_); - return ref_; + void Detach (napi_env env) { + database_->DetachIterator(env, id_); + if (ref_ != NULL) napi_delete_reference(env, ref_); } - void CheckEndCallback () { + void CheckEndCallback (napi_env env) { nexting_ = false; if (endWorker_ != NULL) { - endWorker_->Queue(); + endWorker_->Queue(env); endWorker_ = NULL; } } @@ -793,7 +793,6 @@ struct Iterator final : public BaseIterator { uint32_t highWaterMark_; bool landed_; bool nexting_; - BaseWorker* endWorker_; private: @@ -818,6 +817,7 @@ static void env_cleanup_hook (void* arg) { std::map iterators = database->iterators_; std::map::iterator it; + // TODO: does not do `napi_delete_reference(env, iterator->ref_)`. Problem? for (it = iterators.begin(); it != iterators.end(); ++it) { it->second->End(); } @@ -834,6 +834,7 @@ static void FinalizeDatabase (napi_env env, void* data, void* hint) { if (data) { Database* database = (Database*)data; napi_remove_env_cleanup_hook(env, env_cleanup_hook, database); + if (database->ref_ != NULL) napi_delete_reference(env, database->ref_); delete database; } } @@ -842,7 +843,7 @@ static void FinalizeDatabase (napi_env env, void* data, void* hint) { * Returns a context object for a database. */ NAPI_METHOD(db_init) { - Database* database = new Database(env); + Database* database = new Database(); napi_add_env_cleanup_hook(env, env_cleanup_hook, database); napi_value result; @@ -898,7 +899,7 @@ struct OpenWorker final : public BaseWorker { else if (infoLogLevel == "error") lvl = rocksdb::InfoLogLevel::ERROR_LEVEL; else if (infoLogLevel == "fatal") lvl = rocksdb::InfoLogLevel::FATAL_LEVEL; else if (infoLogLevel == "header") lvl = rocksdb::InfoLogLevel::HEADER_LEVEL; - else napi_throw_error(env_, NULL, "invalid log level"); + else napi_throw_error(env, NULL, "invalid log level"); options_.info_log_level = lvl; } else { @@ -967,7 +968,7 @@ NAPI_METHOD(db_open) { maxOpenFiles, blockRestartInterval, maxFileSize, cacheSize, infoLogLevel, readOnly); - worker->Queue(); + worker->Queue(env); delete [] location; NAPI_RETURN_UNDEFINED(); @@ -1004,7 +1005,7 @@ NAPI_METHOD(db_close) { CloseWorker* worker = new CloseWorker(env, database, callback); if (!database->HasPriorityWork()) { - worker->Queue(); + worker->Queue(env); NAPI_RETURN_UNDEFINED(); } @@ -1065,7 +1066,7 @@ NAPI_METHOD(db_put) { napi_value callback = argv[4]; PutWorker* worker = new PutWorker(env, database, callback, key, value, sync); - worker->Queue(); + worker->Queue(env); NAPI_RETURN_UNDEFINED(); } @@ -1094,19 +1095,19 @@ struct GetWorker final : public PriorityWorker { SetStatus(database_->Get(options_, key_, value_)); } - void HandleOKCallback () override { + void HandleOKCallback (napi_env env) override { napi_value argv[2]; - napi_get_null(env_, &argv[0]); + napi_get_null(env, &argv[0]); if (asBuffer_) { - napi_create_buffer_copy(env_, value_.size(), value_.data(), NULL, &argv[1]); + napi_create_buffer_copy(env, value_.size(), value_.data(), NULL, &argv[1]); } else { - napi_create_string_utf8(env_, value_.data(), value_.size(), &argv[1]); + napi_create_string_utf8(env, value_.data(), value_.size(), &argv[1]); } napi_value callback; - napi_get_reference_value(env_, callbackRef_, &callback); - CallFunction(env_, callback, 2, argv); + napi_get_reference_value(env, callbackRef_, &callback); + CallFunction(env, callback, 2, argv); } leveldb::ReadOptions options_; @@ -1130,7 +1131,7 @@ NAPI_METHOD(db_get) { GetWorker* worker = new GetWorker(env, database, callback, key, asBuffer, fillCache); - worker->Queue(); + worker->Queue(env); NAPI_RETURN_UNDEFINED(); } @@ -1173,7 +1174,7 @@ NAPI_METHOD(db_del) { napi_value callback = argv[3]; DelWorker* worker = new DelWorker(env, database, callback, key, sync); - worker->Queue(); + worker->Queue(env); NAPI_RETURN_UNDEFINED(); } @@ -1257,7 +1258,7 @@ NAPI_METHOD(db_clear) { std::string* gte = RangeOption(env, options, "gte"); ClearWorker* worker = new ClearWorker(env, database, callback, reverse, limit, lt, lte, gt, gte); - worker->Queue(); + worker->Queue(env); NAPI_RETURN_UNDEFINED(); } @@ -1284,13 +1285,13 @@ struct ApproximateSizeWorker final : public PriorityWorker { size_ = database_->ApproximateSize(&range); } - void HandleOKCallback () override { + void HandleOKCallback (napi_env env) override { napi_value argv[2]; - napi_get_null(env_, &argv[0]); - napi_create_int64(env_, (int64_t)size_, &argv[1]); + napi_get_null(env, &argv[0]); + napi_create_int64(env, (uint64_t)size_, &argv[1]); napi_value callback; - napi_get_reference_value(env_, callbackRef_, &callback); - CallFunction(env_, callback, 2, argv); + napi_get_reference_value(env, callbackRef_, &callback); + CallFunction(env, callback, 2, argv); } leveldb::Slice start_; @@ -1313,7 +1314,7 @@ NAPI_METHOD(db_approximate_size) { ApproximateSizeWorker* worker = new ApproximateSizeWorker(env, database, callback, start, end); - worker->Queue(); + worker->Queue(env); NAPI_RETURN_UNDEFINED(); } @@ -1356,7 +1357,7 @@ NAPI_METHOD(db_compact_range) { CompactRangeWorker* worker = new CompactRangeWorker(env, database, callback, start, end); - worker->Queue(); + worker->Queue(env); NAPI_RETURN_UNDEFINED(); } @@ -1415,7 +1416,7 @@ NAPI_METHOD(destroy_db) { napi_value callback = argv[1]; DestroyWorker* worker = new DestroyWorker(env, location, callback); - worker->Queue(); + worker->Queue(env); delete [] location; @@ -1456,7 +1457,7 @@ NAPI_METHOD(repair_db) { napi_value callback = argv[1]; RepairWorker* worker = new RepairWorker(env, location, callback); - worker->Queue(); + worker->Queue(env); delete [] location; @@ -1500,7 +1501,6 @@ NAPI_METHOD(iterator_init) { values, limit, lt, lte, gt, gte, fillCache, keyAsBuffer, valueAsBuffer, highWaterMark); napi_value result; - napi_ref ref; NAPI_STATUS_THROWS(napi_create_external(env, iterator, FinalizeIterator, @@ -1508,8 +1508,7 @@ NAPI_METHOD(iterator_init) { // Prevent GC of JS object before the iterator is ended (explicitly or on // db close) and keep track of non-ended iterators to end them on db close. - NAPI_STATUS_THROWS(napi_create_reference(env, result, 1, &ref)); - iterator->Attach(ref); + iterator->Attach(env, result); return result; } @@ -1549,10 +1548,10 @@ struct EndWorker final : public BaseWorker { iterator_->End(); } - void HandleOKCallback () override { - // TODO: if we don't use EndWorker, do we still delete the reference? - napi_delete_reference(env_, iterator_->Detach()); - BaseWorker::HandleOKCallback(); + void HandleOKCallback (napi_env env) override { + // TODO: would this be safe(r) to do in DoFinally() i.e. after we call the callback? + iterator_->Detach(env); + BaseWorker::HandleOKCallback(env); } Iterator* iterator_; @@ -1570,7 +1569,7 @@ static void iterator_end_do (napi_env env, Iterator* iterator, napi_value cb) { if (iterator->nexting_) { iterator->endWorker_ = worker; } else { - worker->Queue(); + worker->Queue(env); } } } @@ -1614,10 +1613,10 @@ struct NextWorker final : public BaseWorker { } } - void HandleOKCallback () override { + void HandleOKCallback (napi_env env) override { size_t arraySize = result_.size() * 2; napi_value jsArray; - napi_create_array_with_length(env_, arraySize, &jsArray); + napi_create_array_with_length(env, arraySize, &jsArray); for (size_t idx = 0; idx < result_.size(); ++idx) { std::pair row = result_[idx]; @@ -1626,33 +1625,34 @@ struct NextWorker final : public BaseWorker { napi_value returnKey; if (iterator_->keyAsBuffer_) { - napi_create_buffer_copy(env_, key.size(), key.data(), NULL, &returnKey); + napi_create_buffer_copy(env, key.size(), key.data(), NULL, &returnKey); } else { - napi_create_string_utf8(env_, key.data(), key.size(), &returnKey); + napi_create_string_utf8(env, key.data(), key.size(), &returnKey); } napi_value returnValue; if (iterator_->valueAsBuffer_) { - napi_create_buffer_copy(env_, value.size(), value.data(), NULL, &returnValue); + napi_create_buffer_copy(env, value.size(), value.data(), NULL, &returnValue); } else { - napi_create_string_utf8(env_, value.data(), value.size(), &returnValue); + napi_create_string_utf8(env, value.data(), value.size(), &returnValue); } // put the key & value in a descending order, so that they can be .pop:ed in javascript-land - napi_set_element(env_, jsArray, static_cast(arraySize - idx * 2 - 1), returnKey); - napi_set_element(env_, jsArray, static_cast(arraySize - idx * 2 - 2), returnValue); + napi_set_element(env, jsArray, static_cast(arraySize - idx * 2 - 1), returnKey); + napi_set_element(env, jsArray, static_cast(arraySize - idx * 2 - 2), returnValue); } // clean up & handle the next/end state - iterator_->CheckEndCallback(); + // TODO: always do this, even on error + iterator_->CheckEndCallback(env); napi_value argv[3]; - napi_get_null(env_, &argv[0]); + napi_get_null(env, &argv[0]); argv[1] = jsArray; - napi_get_boolean(env_, !ok_, &argv[2]); + napi_get_boolean(env, !ok_, &argv[2]); napi_value callback; - napi_get_reference_value(env_, callbackRef_, &callback); - CallFunction(env_, callback, 3, argv); + napi_get_reference_value(env, callbackRef_, &callback); + CallFunction(env, callback, 3, argv); } Iterator* iterator_; @@ -1678,7 +1678,7 @@ NAPI_METHOD(iterator_next) { NextWorker* worker = new NextWorker(env, iterator, callback); iterator->nexting_ = true; - worker->Queue(); + worker->Queue(env); NAPI_RETURN_UNDEFINED(); } @@ -1762,7 +1762,7 @@ NAPI_METHOD(batch_do) { } BatchWorker* worker = new BatchWorker(env, database, callback, batch, sync, hasData); - worker->Queue(); + worker->Queue(env); NAPI_RETURN_UNDEFINED(); } @@ -1886,12 +1886,10 @@ struct BatchWriteWorker final : public PriorityWorker { batch_(batch), sync_(sync) { // Prevent GC of batch object before we execute - NAPI_STATUS_THROWS_VOID(napi_create_reference(env_, context, 1, &contextRef_)); + NAPI_STATUS_THROWS_VOID(napi_create_reference(env, context, 1, &contextRef_)); } - ~BatchWriteWorker () { - napi_delete_reference(env_, contextRef_); - } + ~BatchWriteWorker () {} void DoExecute () override { if (batch_->hasData_) { @@ -1899,6 +1897,11 @@ struct BatchWriteWorker final : public PriorityWorker { } } + void DoFinally (napi_env env) override { + napi_delete_reference(env, contextRef_); + PriorityWorker::DoFinally(env); + } + Batch* batch_; bool sync_; @@ -1918,7 +1921,7 @@ NAPI_METHOD(batch_write) { napi_value callback = argv[2]; BatchWriteWorker* worker = new BatchWriteWorker(env, argv[0], batch, callback, sync); - worker->Queue(); + worker->Queue(env); NAPI_RETURN_UNDEFINED(); } From 8d6906e86e68589483870694e3a1301448ceff17 Mon Sep 17 00:00:00 2001 From: Vincent Weevers Date: Mon, 20 Sep 2021 14:50:19 +0200 Subject: [PATCH 6/9] Cleanup hanging iterator also when `next()` errored (cherry picked from commit Level/leveldown@7356ba43d3f7261c9b871e947d219e78586ccef2) --- binding.cc | 86 +++++++++++++------------- test/cleanup-hanging-iterators-test.js | 24 +++++++ 2 files changed, 66 insertions(+), 44 deletions(-) diff --git a/binding.cc b/binding.cc index a5ae461d..1c642b05 100644 --- a/binding.cc +++ b/binding.cc @@ -274,6 +274,7 @@ static napi_status CallFunction (napi_env env, * * - DoExecute (abstract, worker pool thread): main work * - HandleOKCallback (main thread): call JS callback on success + * - HandleErrorCallback (main thread): call JS callback on error * - DoFinally (main thread): do cleanup regardless of success */ struct BaseWorker { @@ -324,48 +325,52 @@ struct BaseWorker { } virtual void DoExecute () = 0; - virtual void DoFinally (napi_env env) {}; static void Complete (napi_env env, napi_status status, void* data) { BaseWorker* self = (BaseWorker*)data; self->DoComplete(env); self->DoFinally(env); - - napi_delete_reference(env, self->callbackRef_); - napi_delete_async_work(env, self->asyncWork_); - - delete self; } void DoComplete (napi_env env) { - if (status_.ok()) { - return HandleOKCallback(env); - } - - napi_value argv = CreateError(env, errMsg_); napi_value callback; napi_get_reference_value(env, callbackRef_, &callback); - CallFunction(env, callback, 1, &argv); + + if (status_.ok()) { + HandleOKCallback(env, callback); + } else { + HandleErrorCallback(env, callback); + } } - virtual void HandleOKCallback (napi_env env) { + virtual void HandleOKCallback (napi_env env, napi_value callback) { napi_value argv; napi_get_null(env, &argv); - napi_value callback; - napi_get_reference_value(env, callbackRef_, &callback); CallFunction(env, callback, 1, &argv); } + virtual void HandleErrorCallback (napi_env env, napi_value callback) { + napi_value argv = CreateError(env, errMsg_); + CallFunction(env, callback, 1, &argv); + } + + virtual void DoFinally (napi_env env) { + napi_delete_reference(env, callbackRef_); + napi_delete_async_work(env, asyncWork_); + + delete this; + } + void Queue (napi_env env) { napi_queue_async_work(env, asyncWork_); } - napi_ref callbackRef_; - napi_async_work asyncWork_; Database* database_; private: + napi_ref callbackRef_; + napi_async_work asyncWork_; leveldb::Status status_; char *errMsg_; }; @@ -503,6 +508,7 @@ struct PriorityWorker : public BaseWorker { void DoFinally (napi_env env) override { database_->DecrementPriorityWork(env); + BaseWorker::DoFinally(env); } }; @@ -519,7 +525,6 @@ struct BaseIterator { int limit, bool fillCache) : database_(database), - isEnding_(false), hasEnded_(false), didSeek_(false), reverse_(reverse), @@ -682,7 +687,6 @@ struct BaseIterator { } Database* database_; - bool isEnding_; bool hasEnded_; private: @@ -726,6 +730,7 @@ struct Iterator final : public BaseIterator { highWaterMark_(highWaterMark), landed_(false), nexting_(false), + isEnding_(false), endWorker_(NULL), ref_(NULL) { } @@ -742,15 +747,6 @@ struct Iterator final : public BaseIterator { if (ref_ != NULL) napi_delete_reference(env, ref_); } - void CheckEndCallback (napi_env env) { - nexting_ = false; - - if (endWorker_ != NULL) { - endWorker_->Queue(env); - endWorker_ = NULL; - } - } - bool ReadMany (uint32_t size, std::vector>& result) { size_t bytesRead = 0; @@ -793,6 +789,7 @@ struct Iterator final : public BaseIterator { uint32_t highWaterMark_; bool landed_; bool nexting_; + bool isEnding_; BaseWorker* endWorker_; private: @@ -1095,7 +1092,7 @@ struct GetWorker final : public PriorityWorker { SetStatus(database_->Get(options_, key_, value_)); } - void HandleOKCallback (napi_env env) override { + void HandleOKCallback (napi_env env, napi_value callback) override { napi_value argv[2]; napi_get_null(env, &argv[0]); @@ -1105,8 +1102,6 @@ struct GetWorker final : public PriorityWorker { napi_create_string_utf8(env, value_.data(), value_.size(), &argv[1]); } - napi_value callback; - napi_get_reference_value(env, callbackRef_, &callback); CallFunction(env, callback, 2, argv); } @@ -1285,12 +1280,10 @@ struct ApproximateSizeWorker final : public PriorityWorker { size_ = database_->ApproximateSize(&range); } - void HandleOKCallback (napi_env env) override { + void HandleOKCallback (napi_env env, napi_value callback) override { napi_value argv[2]; napi_get_null(env, &argv[0]); napi_create_int64(env, (uint64_t)size_, &argv[1]); - napi_value callback; - napi_get_reference_value(env, callbackRef_, &callback); CallFunction(env, callback, 2, argv); } @@ -1548,10 +1541,9 @@ struct EndWorker final : public BaseWorker { iterator_->End(); } - void HandleOKCallback (napi_env env) override { - // TODO: would this be safe(r) to do in DoFinally() i.e. after we call the callback? + void DoFinally (napi_env env) override { iterator_->Detach(env); - BaseWorker::HandleOKCallback(env); + BaseWorker::DoFinally(env); } Iterator* iterator_; @@ -1613,7 +1605,7 @@ struct NextWorker final : public BaseWorker { } } - void HandleOKCallback (napi_env env) override { + void HandleOKCallback (napi_env env, napi_value callback) override { size_t arraySize = result_.size() * 2; napi_value jsArray; napi_create_array_with_length(env, arraySize, &jsArray); @@ -1642,19 +1634,25 @@ struct NextWorker final : public BaseWorker { napi_set_element(env, jsArray, static_cast(arraySize - idx * 2 - 2), returnValue); } - // clean up & handle the next/end state - // TODO: always do this, even on error - iterator_->CheckEndCallback(env); - napi_value argv[3]; napi_get_null(env, &argv[0]); argv[1] = jsArray; napi_get_boolean(env, !ok_, &argv[2]); - napi_value callback; - napi_get_reference_value(env, callbackRef_, &callback); CallFunction(env, callback, 3, argv); } + void DoFinally (napi_env env) override { + // clean up & handle the next/end state + iterator_->nexting_ = false; + + if (iterator_->endWorker_ != NULL) { + iterator_->endWorker_->Queue(env); + iterator_->endWorker_ = NULL; + } + + BaseWorker::DoFinally(env); + } + Iterator* iterator_; std::vector > result_; bool ok_; diff --git a/test/cleanup-hanging-iterators-test.js b/test/cleanup-hanging-iterators-test.js index 89578a28..535977ee 100644 --- a/test/cleanup-hanging-iterators-test.js +++ b/test/cleanup-hanging-iterators-test.js @@ -92,3 +92,27 @@ makeTest('test ending iterators', function (db, t, done) { done() }) }) + +makeTest('test recursive next', function (db, t, done) { + // Test that we're able to close when user keeps scheduling work + const it = db.iterator({ highWaterMark: 0 }) + + it.next(function loop (err, key) { + if (err && err.message !== 'iterator has ended') throw err + if (key !== undefined) it.next(loop) + }) + + done() +}) + +makeTest('test recursive next (random)', function (db, t, done) { + // Same as the test above but closing at a random time + const it = db.iterator({ highWaterMark: 0 }) + + it.next(function loop (err, key) { + if (err && err.message !== 'iterator has ended') throw err + if (key !== undefined) it.next(loop) + }) + + setTimeout(done, Math.floor(Math.random() * 50)) +}) From db7060db0a617baebe25ce1e98cdaf48c227250c Mon Sep 17 00:00:00 2001 From: Vincent Weevers Date: Mon, 20 Sep 2021 20:48:25 +0200 Subject: [PATCH 7/9] Optimize `db.iterator()` By using `emplace_back()`, reusing the `std::vector` cache between `iterator.next()` calls, and not advancing the iterator prematurely. That last one only matters for single reads (i.e. the first `next()` call or one made after seeking) and it doesn't improve performance compared to the previous release, just undoes a mistake in Level/leveldown@b815bea. (cherry picked from commit Level/leveldown@112906b36be181e587a0b8f72a0584ff6e38e378) --- binding.cc | 136 ++++++++++++++++++++++++++--------------------------- 1 file changed, 67 insertions(+), 69 deletions(-) diff --git a/binding.cc b/binding.cc index 1c642b05..45f6ba68 100644 --- a/binding.cc +++ b/binding.cc @@ -481,7 +481,7 @@ struct Database { } } - bool HasPriorityWork () { + bool HasPriorityWork () const { return priorityWork_ > 0; } @@ -533,8 +533,7 @@ struct BaseIterator { gt_(gt), gte_(gte), limit_(limit), - count_(0), - eof_(false) { + count_(0) { options_ = new leveldb::ReadOptions(); options_->fill_cache = fillCache; options_->verify_checksums = false; @@ -601,40 +600,22 @@ struct BaseIterator { didSeek_ = true; if (OutOfRange(target)) { - if (reverse_) { - dbIterator_->SeekToFirst(); - dbIterator_->Prev(); - } else { - dbIterator_->SeekToLast(); - dbIterator_->Next(); - } - - return; + return SeekToEnd(); } dbIterator_->Seek(target); if (dbIterator_->Valid()) { int cmp = dbIterator_->key().compare(target); - if (cmp > 0 && reverse_) { - dbIterator_->Prev(); - } else if (cmp < 0 && !reverse_) { - dbIterator_->Next(); + if (reverse_ ? cmp > 0 : cmp < 0) { + Next(); } } else { - if (reverse_) { - dbIterator_->SeekToLast(); - } else { - dbIterator_->SeekToFirst(); - } + SeekToFirst(); if (dbIterator_->Valid()) { int cmp = dbIterator_->key().compare(target); - if (cmp > 0 && reverse_) { - dbIterator_->SeekToFirst(); - dbIterator_->Prev(); - } else if (cmp < 0 && !reverse_) { - dbIterator_->SeekToLast(); - dbIterator_->Next(); + if (reverse_ ? cmp > 0 : cmp < 0) { + SeekToEnd(); } } } @@ -649,24 +630,34 @@ struct BaseIterator { } } - bool ReadOne () { - if (eof_ || !dbIterator_->Valid()) { - return false; - } - - if ((limit_ >= 0 && ++count_ > limit_) || OutOfRange(dbIterator_->key())) { - eof_ = true; - return false; - } + bool Valid () const { + return dbIterator_->Valid() && !OutOfRange(dbIterator_->key()); + } - return true; + bool Increment () { + return limit_ < 0 || ++count_ <= limit_; } - void Advance () { + void Next () { if (reverse_) dbIterator_->Prev(); else dbIterator_->Next(); } + void SeekToFirst () { + if (reverse_) dbIterator_->SeekToLast(); + else dbIterator_->SeekToFirst(); + } + + void SeekToLast () { + if (reverse_) dbIterator_->SeekToFirst(); + else dbIterator_->SeekToLast(); + } + + void SeekToEnd () { + SeekToLast(); + Next(); + } + leveldb::Slice CurrentKey () const { return dbIterator_->key(); } @@ -679,7 +670,13 @@ struct BaseIterator { return dbIterator_->status(); } - bool OutOfRange (const leveldb::Slice& target) { + bool OutOfRange (const leveldb::Slice& target) const { + // TODO: benchmark to see if this is worth it + // if (upperBoundOnly && !reverse_) { + // return ((lt_ != NULL && target.compare(*lt_) >= 0) || + // (lte_ != NULL && target.compare(*lte_) > 0)); + // } + return ((lt_ != NULL && target.compare(*lt_) >= 0) || (lte_ != NULL && target.compare(*lte_) > 0) || (gt_ != NULL && target.compare(*gt_) <= 0) || @@ -699,7 +696,6 @@ struct BaseIterator { std::string* gte_; int limit_; int count_; - bool eof_; leveldb::ReadOptions* options_; }; @@ -747,33 +743,36 @@ struct Iterator final : public BaseIterator { if (ref_ != NULL) napi_delete_reference(env, ref_); } - bool ReadMany (uint32_t size, std::vector>& result) { + bool ReadMany (uint32_t size) { + cache_.clear(); size_t bytesRead = 0; - while (ReadOne()) { - std::string key, value; + while (true) { + if (landed_) Next(); + if (!Valid() || !Increment()) break; if (keys_) { leveldb::Slice slice = CurrentKey(); - key.assign(slice.data(), slice.size()); - bytesRead += key.size(); + cache_.emplace_back(slice.data(), slice.size()); + bytesRead += slice.size(); + } else { + cache_.emplace_back(""); } if (values_) { leveldb::Slice slice = CurrentValue(); - value.assign(slice.data(), slice.size()); - bytesRead += value.size(); + cache_.emplace_back(slice.data(), slice.size()); + bytesRead += slice.size(); + } else { + cache_.emplace_back(""); } - Advance(); - result.push_back(std::make_pair(key, value)); - if (!landed_) { landed_ = true; return true; } - if (bytesRead > highWaterMark_ || result.size() >= size) { + if (bytesRead > highWaterMark_ || cache_.size() >= size * 2) { return true; } } @@ -791,6 +790,7 @@ struct Iterator final : public BaseIterator { bool nexting_; bool isEnding_; BaseWorker* endWorker_; + std::vector cache_; private: napi_ref ref_; @@ -1188,18 +1188,18 @@ struct ClearWorker final : public PriorityWorker { std::string* gt, std::string* gte) : PriorityWorker(env, database, callback, "leveldown.db.clear") { - baseIterator_ = new BaseIterator(database, reverse, lt, lte, gt, gte, limit, false); + iterator_ = new BaseIterator(database, reverse, lt, lte, gt, gte, limit, false); writeOptions_ = new leveldb::WriteOptions(); writeOptions_->sync = false; } ~ClearWorker () { - delete baseIterator_; + delete iterator_; delete writeOptions_; } void DoExecute () override { - baseIterator_->SeekToRange(); + iterator_->SeekToRange(); // TODO: add option uint32_t hwm = 16 * 1024; @@ -1208,14 +1208,14 @@ struct ClearWorker final : public PriorityWorker { while (true) { size_t bytesRead = 0; - while (bytesRead < hwm && baseIterator_->ReadOne()) { - leveldb::Slice key = baseIterator_->CurrentKey(); + while (bytesRead <= hwm && iterator_->Valid() && iterator_->Increment()) { + leveldb::Slice key = iterator_->CurrentKey(); batch.Delete(key); bytesRead += key.size(); - baseIterator_->Advance(); + iterator_->Next(); } - if (!SetStatus(baseIterator_->Status()) || bytesRead == 0) { + if (!SetStatus(iterator_->Status()) || bytesRead == 0) { break; } @@ -1226,11 +1226,11 @@ struct ClearWorker final : public PriorityWorker { batch.Clear(); } - baseIterator_->End(); + iterator_->End(); } private: - BaseIterator* baseIterator_; + BaseIterator* iterator_; leveldb::WriteOptions* writeOptions_; }; @@ -1598,7 +1598,7 @@ struct NextWorker final : public BaseWorker { // Limit the size of the cache to prevent starving the event loop // in JS-land while we're recursively calling process.nextTick(). - ok_ = iterator_->ReadMany(1000, result_); + ok_ = iterator_->ReadMany(1000); if (!ok_) { SetStatus(iterator_->Status()); @@ -1606,14 +1606,13 @@ struct NextWorker final : public BaseWorker { } void HandleOKCallback (napi_env env, napi_value callback) override { - size_t arraySize = result_.size() * 2; + size_t arraySize = iterator_->cache_.size(); napi_value jsArray; napi_create_array_with_length(env, arraySize, &jsArray); - for (size_t idx = 0; idx < result_.size(); ++idx) { - std::pair row = result_[idx]; - std::string key = row.first; - std::string value = row.second; + for (size_t idx = 0; idx < iterator_->cache_.size(); idx += 2) { + std::string key = iterator_->cache_[idx]; + std::string value = iterator_->cache_[idx + 1]; napi_value returnKey; if (iterator_->keyAsBuffer_) { @@ -1630,8 +1629,8 @@ struct NextWorker final : public BaseWorker { } // put the key & value in a descending order, so that they can be .pop:ed in javascript-land - napi_set_element(env, jsArray, static_cast(arraySize - idx * 2 - 1), returnKey); - napi_set_element(env, jsArray, static_cast(arraySize - idx * 2 - 2), returnValue); + napi_set_element(env, jsArray, static_cast(arraySize - idx - 1), returnKey); + napi_set_element(env, jsArray, static_cast(arraySize - idx - 2), returnValue); } napi_value argv[3]; @@ -1654,7 +1653,6 @@ struct NextWorker final : public BaseWorker { } Iterator* iterator_; - std::vector > result_; bool ok_; }; From 0c1f8d8a1b212a275f0000bcf6a3d72c44e81eca Mon Sep 17 00:00:00 2001 From: Vincent Weevers Date: Fri, 24 Sep 2021 18:02:12 +0200 Subject: [PATCH 8/9] Refactor: add Entry struct to abstract away key-value pairs And add `const` and `private` where appropriate. (cherry picked from commit Level/leveldown@576d1355ac07d012c584aa42c98f893c5256573f) --- binding.cc | 219 ++++++++++++++++++++++++++++++++--------------------- 1 file changed, 133 insertions(+), 86 deletions(-) diff --git a/binding.cc b/binding.cc index 45f6ba68..508a9cfb 100644 --- a/binding.cc +++ b/binding.cc @@ -268,6 +268,64 @@ static napi_status CallFunction (napi_env env, return napi_call_function(env, global, callback, argc, argv, NULL); } +/** + * Whether to yield entries, keys or values. + */ +enum Mode { + entries, + keys, + values +}; + +/** + * Helper struct for caching and converting a key-value pair to napi_values. + */ +struct Entry { + Entry (const leveldb::Slice* key, const leveldb::Slice* value) { + key_ = key != NULL ? new std::string(key->data(), key->size()) : NULL; + value_ = value != NULL ? new std::string(value->data(), value->size()) : NULL; + } + + ~Entry () { + if (key_ != NULL) delete key_; + if (value_ != NULL) delete value_; + } + + // Not used yet. + void ConvertXX (napi_env env, Mode mode, bool keyAsBuffer, bool valueAsBuffer, napi_value* result) { + if (mode == Mode::entries) { + napi_create_array_with_length(env, 2, result); + + napi_value valueElement; + napi_value keyElement; + + Convert(env, key_, keyAsBuffer, &keyElement); + Convert(env, value_, valueAsBuffer, &valueElement); + + napi_set_element(env, *result, 0, keyElement); + napi_set_element(env, *result, 1, valueElement); + } else if (mode == Mode::keys) { + Convert(env, key_, keyAsBuffer, result); + } else { + Convert(env, value_, valueAsBuffer, result); + } + } + + static void Convert (napi_env env, const std::string* s, bool asBuffer, napi_value* result) { + if (s == NULL) { + napi_get_undefined(env, result); + } else if (asBuffer) { + napi_create_buffer_copy(env, s->size(), s->data(), NULL, result); + } else { + napi_create_string_utf8(env, s->data(), s->size(), result); + } + } + +private: + std::string* key_; + std::string* value_; +}; + /** * Base worker class. Handles the async work. Derived classes can override the * following virtual methods (listed in the order in which they're called): @@ -517,13 +575,13 @@ struct PriorityWorker : public BaseWorker { */ struct BaseIterator { BaseIterator(Database* database, - bool reverse, + const bool reverse, std::string* lt, std::string* lte, std::string* gt, std::string* gte, - int limit, - bool fillCache) + const int limit, + const bool fillCache) : database_(database), hasEnded_(false), didSeek_(false), @@ -689,12 +747,12 @@ struct BaseIterator { private: leveldb::Iterator* dbIterator_; bool didSeek_; - bool reverse_; + const bool reverse_; std::string* lt_; std::string* lte_; std::string* gt_; std::string* gte_; - int limit_; + const int limit_; int count_; leveldb::ReadOptions* options_; }; @@ -704,19 +762,19 @@ struct BaseIterator { */ struct Iterator final : public BaseIterator { Iterator (Database* database, - uint32_t id, - bool reverse, - bool keys, - bool values, - int limit, + const uint32_t id, + const bool reverse, + const bool keys, + const bool values, + const int limit, std::string* lt, std::string* lte, std::string* gt, std::string* gte, - bool fillCache, - bool keyAsBuffer, - bool valueAsBuffer, - uint32_t highWaterMark) + const bool fillCache, + const bool keyAsBuffer, + const bool valueAsBuffer, + const uint32_t highWaterMark) : BaseIterator(database, reverse, lt, lte, gt, gte, limit, fillCache), id_(id), keys_(keys), @@ -780,12 +838,12 @@ struct Iterator final : public BaseIterator { return false; } - uint32_t id_; - bool keys_; - bool values_; - bool keyAsBuffer_; - bool valueAsBuffer_; - uint32_t highWaterMark_; + const uint32_t id_; + const bool keys_; + const bool values_; + const bool keyAsBuffer_; + const bool valueAsBuffer_; + const uint32_t highWaterMark_; bool landed_; bool nexting_; bool isEnding_; @@ -863,17 +921,17 @@ struct OpenWorker final : public BaseWorker { Database* database, napi_value callback, const std::string& location, - bool createIfMissing, - bool errorIfExists, - bool compression, - uint32_t writeBufferSize, - uint32_t blockSize, - uint32_t maxOpenFiles, - uint32_t blockRestartInterval, - uint32_t maxFileSize, - uint32_t cacheSize, + const bool createIfMissing, + const bool errorIfExists, + const bool compression, + const uint32_t writeBufferSize, + const uint32_t blockSize, + const uint32_t maxOpenFiles, + const uint32_t blockRestartInterval, + const uint32_t maxFileSize, + const uint32_t cacheSize, const std::string& infoLogLevel, - bool readOnly) + const bool readOnly) : BaseWorker(env, database, callback, "leveldown.db.open"), readOnly_(readOnly), location_(location) { @@ -943,20 +1001,20 @@ NAPI_METHOD(db_open) { NAPI_ARGV_UTF8_NEW(location, 1); napi_value options = argv[2]; - bool createIfMissing = BooleanProperty(env, options, "createIfMissing", true); - bool errorIfExists = BooleanProperty(env, options, "errorIfExists", false); - bool compression = BooleanProperty(env, options, "compression", true); + const bool createIfMissing = BooleanProperty(env, options, "createIfMissing", true); + const bool errorIfExists = BooleanProperty(env, options, "errorIfExists", false); + const bool compression = BooleanProperty(env, options, "compression", true); bool readOnly = BooleanProperty(env, options, "readOnly", false); - std::string infoLogLevel = StringProperty(env, options, "infoLogLevel"); + const std::string infoLogLevel = StringProperty(env, options, "infoLogLevel"); - uint32_t cacheSize = Uint32Property(env, options, "cacheSize", 8 << 20); - uint32_t writeBufferSize = Uint32Property(env, options , "writeBufferSize" , 4 << 20); - uint32_t blockSize = Uint32Property(env, options, "blockSize", 4096); - uint32_t maxOpenFiles = Uint32Property(env, options, "maxOpenFiles", 1000); - uint32_t blockRestartInterval = Uint32Property(env, options, + const uint32_t cacheSize = Uint32Property(env, options, "cacheSize", 8 << 20); + const uint32_t writeBufferSize = Uint32Property(env, options , "writeBufferSize" , 4 << 20); + const uint32_t blockSize = Uint32Property(env, options, "blockSize", 4096); + const uint32_t maxOpenFiles = Uint32Property(env, options, "maxOpenFiles", 1000); + const uint32_t blockRestartInterval = Uint32Property(env, options, "blockRestartInterval", 16); - uint32_t maxFileSize = Uint32Property(env, options, "maxFileSize", 2 << 20); + const uint32_t maxFileSize = Uint32Property(env, options, "maxFileSize", 2 << 20); napi_value callback = argv[3]; OpenWorker* worker = new OpenWorker(env, database, callback, location, @@ -1076,8 +1134,8 @@ struct GetWorker final : public PriorityWorker { Database* database, napi_value callback, leveldb::Slice key, - bool asBuffer, - bool fillCache) + const bool asBuffer, + const bool fillCache) : PriorityWorker(env, database, callback, "leveldown.db.get"), key_(key), asBuffer_(asBuffer) { @@ -1095,20 +1153,15 @@ struct GetWorker final : public PriorityWorker { void HandleOKCallback (napi_env env, napi_value callback) override { napi_value argv[2]; napi_get_null(env, &argv[0]); - - if (asBuffer_) { - napi_create_buffer_copy(env, value_.size(), value_.data(), NULL, &argv[1]); - } else { - napi_create_string_utf8(env, value_.data(), value_.size(), &argv[1]); - } - + Entry::Convert(env, &value_, asBuffer_, &argv[1]); CallFunction(env, callback, 2, argv); } +private: leveldb::ReadOptions options_; leveldb::Slice key_; std::string value_; - bool asBuffer_; + const bool asBuffer_; }; /** @@ -1120,8 +1173,8 @@ NAPI_METHOD(db_get) { leveldb::Slice key = ToSlice(env, argv[1]); napi_value options = argv[2]; - bool asBuffer = BooleanProperty(env, options, "asBuffer", true); - bool fillCache = BooleanProperty(env, options, "fillCache", true); + const bool asBuffer = BooleanProperty(env, options, "asBuffer", true); + const bool fillCache = BooleanProperty(env, options, "fillCache", true); napi_value callback = argv[3]; GetWorker* worker = new GetWorker(env, database, callback, key, asBuffer, @@ -1181,8 +1234,8 @@ struct ClearWorker final : public PriorityWorker { ClearWorker (napi_env env, Database* database, napi_value callback, - bool reverse, - int limit, + const bool reverse, + const int limit, std::string* lt, std::string* lte, std::string* gt, @@ -1244,8 +1297,8 @@ NAPI_METHOD(db_clear) { napi_value options = argv[1]; napi_value callback = argv[2]; - bool reverse = BooleanProperty(env, options, "reverse", false); - int limit = Int32Property(env, options, "limit", -1); + const bool reverse = BooleanProperty(env, options, "reverse", false); + const int limit = Int32Property(env, options, "limit", -1); std::string* lt = RangeOption(env, options, "lt"); std::string* lte = RangeOption(env, options, "lte"); @@ -1474,14 +1527,14 @@ NAPI_METHOD(iterator_init) { NAPI_DB_CONTEXT(); napi_value options = argv[1]; - bool reverse = BooleanProperty(env, options, "reverse", false); - bool keys = BooleanProperty(env, options, "keys", true); - bool values = BooleanProperty(env, options, "values", true); - bool fillCache = BooleanProperty(env, options, "fillCache", false); - bool keyAsBuffer = BooleanProperty(env, options, "keyAsBuffer", true); - bool valueAsBuffer = BooleanProperty(env, options, "valueAsBuffer", true); - int limit = Int32Property(env, options, "limit", -1); - uint32_t highWaterMark = Uint32Property(env, options, "highWaterMark", + const bool reverse = BooleanProperty(env, options, "reverse", false); + const bool keys = BooleanProperty(env, options, "keys", true); + const bool values = BooleanProperty(env, options, "values", true); + const bool fillCache = BooleanProperty(env, options, "fillCache", false); + const bool keyAsBuffer = BooleanProperty(env, options, "keyAsBuffer", true); + const bool valueAsBuffer = BooleanProperty(env, options, "valueAsBuffer", true); + const int limit = Int32Property(env, options, "limit", -1); + const uint32_t highWaterMark = Uint32Property(env, options, "highWaterMark", 16 * 1024); std::string* lt = RangeOption(env, options, "lt"); @@ -1489,7 +1542,7 @@ NAPI_METHOD(iterator_init) { std::string* gt = RangeOption(env, options, "gt"); std::string* gte = RangeOption(env, options, "gte"); - uint32_t id = database->currentIteratorId_++; + const uint32_t id = database->currentIteratorId_++; Iterator* iterator = new Iterator(database, id, reverse, keys, values, limit, lt, lte, gt, gte, fillCache, keyAsBuffer, valueAsBuffer, highWaterMark); @@ -1546,6 +1599,7 @@ struct EndWorker final : public BaseWorker { BaseWorker::DoFinally(env); } +private: Iterator* iterator_; }; @@ -1587,7 +1641,7 @@ struct NextWorker final : public BaseWorker { napi_value callback) : BaseWorker(env, iterator->database_, callback, "leveldown.iterator.next"), - iterator_(iterator) {} + iterator_(iterator), ok_() {} ~NextWorker () {} @@ -1615,18 +1669,10 @@ struct NextWorker final : public BaseWorker { std::string value = iterator_->cache_[idx + 1]; napi_value returnKey; - if (iterator_->keyAsBuffer_) { - napi_create_buffer_copy(env, key.size(), key.data(), NULL, &returnKey); - } else { - napi_create_string_utf8(env, key.data(), key.size(), &returnKey); - } - napi_value returnValue; - if (iterator_->valueAsBuffer_) { - napi_create_buffer_copy(env, value.size(), value.data(), NULL, &returnValue); - } else { - napi_create_string_utf8(env, value.data(), value.size(), &returnValue); - } + + Entry::Convert(env, &key, iterator_->keyAsBuffer_, &returnKey); + Entry::Convert(env, &value, iterator_->valueAsBuffer_, &returnValue); // put the key & value in a descending order, so that they can be .pop:ed in javascript-land napi_set_element(env, jsArray, static_cast(arraySize - idx - 1), returnKey); @@ -1652,6 +1698,7 @@ struct NextWorker final : public BaseWorker { BaseWorker::DoFinally(env); } +private: Iterator* iterator_; bool ok_; }; @@ -1687,8 +1734,8 @@ struct BatchWorker final : public PriorityWorker { Database* database, napi_value callback, leveldb::WriteBatch* batch, - bool sync, - bool hasData) + const bool sync, + const bool hasData) : PriorityWorker(env, database, callback, "leveldown.batch.do"), batch_(batch), hasData_(hasData) { options_.sync = sync; @@ -1704,9 +1751,10 @@ struct BatchWorker final : public PriorityWorker { } } +private: leveldb::WriteOptions options_; leveldb::WriteBatch* batch_; - bool hasData_; + const bool hasData_; }; /** @@ -1717,7 +1765,7 @@ NAPI_METHOD(batch_do) { NAPI_DB_CONTEXT(); napi_value array = argv[1]; - bool sync = BooleanProperty(env, argv[2], "sync", false); + const bool sync = BooleanProperty(env, argv[2], "sync", false); napi_value callback = argv[3]; uint32_t length; @@ -1877,7 +1925,7 @@ struct BatchWriteWorker final : public PriorityWorker { napi_value context, Batch* batch, napi_value callback, - bool sync) + const bool sync) : PriorityWorker(env, batch->database_, callback, "leveldown.batch.write"), batch_(batch), sync_(sync) { @@ -1898,10 +1946,9 @@ struct BatchWriteWorker final : public PriorityWorker { PriorityWorker::DoFinally(env); } - Batch* batch_; - bool sync_; - private: + Batch* batch_; + const bool sync_; napi_ref contextRef_; }; @@ -1913,7 +1960,7 @@ NAPI_METHOD(batch_write) { NAPI_BATCH_CONTEXT(); napi_value options = argv[1]; - bool sync = BooleanProperty(env, options, "sync", false); + const bool sync = BooleanProperty(env, options, "sync", false); napi_value callback = argv[2]; BatchWriteWorker* worker = new BatchWriteWorker(env, argv[0], batch, callback, sync); From 17d4116f1f8923f3495147672a9e3215a17b3003 Mon Sep 17 00:00:00 2001 From: Vincent Weevers Date: Tue, 28 Sep 2021 22:52:55 +0200 Subject: [PATCH 9/9] Add `db.getMany(keys)` (cherry picked from commit Level/leveldown@50dc50bf005c70b024fe4d3add369b236c8dcbb9) --- binding.cc | 118 +++++++++++++++++++++++++++++++++++++++++++++++++ leveldown.js | 5 +++ package.json | 2 +- test/common.js | 5 ++- 4 files changed, 127 insertions(+), 3 deletions(-) diff --git a/binding.cc b/binding.cc index 508a9cfb..b2f8660c 100644 --- a/binding.cc +++ b/binding.cc @@ -256,6 +256,31 @@ static std::string* RangeOption (napi_env env, napi_value opts, const char* name return NULL; } +/** + * Converts an array containing Buffer or string keys to a vector. + * Empty elements are skipped. + */ +static std::vector* KeyArray (napi_env env, napi_value arr) { + uint32_t length; + std::vector* result = new std::vector(); + + if (napi_get_array_length(env, arr, &length) == napi_ok) { + result->reserve(length); + + for (uint32_t i = 0; i < length; i++) { + napi_value element; + + if (napi_get_element(env, arr, i, &element) == napi_ok && + StringOrBufferLength(env, element) > 0) { + LD_STRING_OR_BUFFER_TO_COPY(env, element, to); + result->emplace_back(toCh_, toSz_); + } + } + } + + return result; +} + /** * Calls a function. */ @@ -1184,6 +1209,98 @@ NAPI_METHOD(db_get) { NAPI_RETURN_UNDEFINED(); } +/** + * Worker class for getting many values. + */ +struct GetManyWorker final : public PriorityWorker { + GetManyWorker (napi_env env, + Database* database, + const std::vector* keys, + napi_value callback, + const bool valueAsBuffer, + const bool fillCache) + : PriorityWorker(env, database, callback, "leveldown.get.many"), + keys_(keys), valueAsBuffer_(valueAsBuffer) { + options_.fill_cache = fillCache; + options_.snapshot = database->NewSnapshot(); + } + + ~GetManyWorker() { + delete keys_; + } + + void DoExecute () override { + cache_.reserve(keys_->size()); + + for (const std::string& key: *keys_) { + std::string* value = new std::string(); + leveldb::Status status = database_->Get(options_, key, *value); + + if (status.ok()) { + cache_.push_back(value); + } else if (status.IsNotFound()) { + delete value; + cache_.push_back(NULL); + } else { + delete value; + for (const std::string* value: cache_) { + if (value != NULL) delete value; + } + SetStatus(status); + break; + } + } + + database_->ReleaseSnapshot(options_.snapshot); + } + + void HandleOKCallback (napi_env env, napi_value callback) override { + size_t size = cache_.size(); + napi_value array; + napi_create_array_with_length(env, size, &array); + + for (size_t idx = 0; idx < size; idx++) { + std::string* value = cache_[idx]; + napi_value element; + Entry::Convert(env, value, valueAsBuffer_, &element); + napi_set_element(env, array, static_cast(idx), element); + if (value != NULL) delete value; + } + + napi_value argv[2]; + napi_get_null(env, &argv[0]); + argv[1] = array; + CallFunction(env, callback, 2, argv); + } + +private: + leveldb::ReadOptions options_; + const std::vector* keys_; + const bool valueAsBuffer_; + std::vector cache_; +}; + +/** + * Gets many values from a database. + */ +NAPI_METHOD(db_get_many) { + NAPI_ARGV(4); + NAPI_DB_CONTEXT(); + + const std::vector* keys = KeyArray(env, argv[1]); + napi_value options = argv[2]; + const bool asBuffer = BooleanProperty(env, options, "asBuffer", true); + const bool fillCache = BooleanProperty(env, options, "fillCache", true); + napi_value callback = argv[3]; + + GetManyWorker* worker = new GetManyWorker( + env, database, keys, callback, asBuffer, fillCache + ); + + worker->Queue(env); + NAPI_RETURN_UNDEFINED(); +} + /** * Worker class for deleting a value from a database. */ @@ -1978,6 +2095,7 @@ NAPI_INIT() { NAPI_EXPORT_FUNCTION(db_close); NAPI_EXPORT_FUNCTION(db_put); NAPI_EXPORT_FUNCTION(db_get); + NAPI_EXPORT_FUNCTION(db_get_many); NAPI_EXPORT_FUNCTION(db_del); NAPI_EXPORT_FUNCTION(db_clear); NAPI_EXPORT_FUNCTION(db_approximate_size); diff --git a/leveldown.js b/leveldown.js index 226a518c..11b2dfc3 100644 --- a/leveldown.js +++ b/leveldown.js @@ -21,6 +21,7 @@ function LevelDOWN (location) { permanence: true, seek: true, clear: true, + getMany: true, createIfMissing: true, errorIfExists: true, additionalMethods: { @@ -59,6 +60,10 @@ LevelDOWN.prototype._get = function (key, options, callback) { binding.db_get(this.context, key, options, callback) } +LevelDOWN.prototype._getMany = function (keys, options, callback) { + binding.db_get_many(this.context, keys, options, callback) +} + LevelDOWN.prototype._del = function (key, options, callback) { binding.db_del(this.context, key, options, callback) } diff --git a/package.json b/package.json index 06634601..2d304160 100644 --- a/package.json +++ b/package.json @@ -22,7 +22,7 @@ "prepublishOnly": "npm run dependency-check" }, "dependencies": { - "abstract-leveldown": "^7.0.0", + "abstract-leveldown": "^7.2.0", "napi-macros": "^2.0.0", "node-gyp-build": "^4.3.0" }, diff --git a/test/common.js b/test/common.js index de2612f2..4098b136 100644 --- a/test/common.js +++ b/test/common.js @@ -9,6 +9,7 @@ module.exports = suite.common({ return leveldown(tempy.directory()) }, - // Opt-in to new clear() tests - clear: true + // Opt-in to new tests + clear: true, + getMany: true })