From e31c90e63146a7da90626a3042a958b88fc8ea99 Mon Sep 17 00:00:00 2001 From: Bart Louwers Date: Wed, 10 Sep 2025 23:21:19 +0200 Subject: [PATCH 1/6] sqlite: fix crash session extension callbacks with workers --- src/node_sqlite.cc | 28 ++++++----- test/parallel/test-sqlite-session.js | 74 ++++++++++++++++++++++++++++ test/parallel/test-sqlite.js | 10 +--- test/sqlite/next-db.js | 14 ++++++ test/sqlite/worker.js | 24 +++++++++ 5 files changed, 128 insertions(+), 22 deletions(-) create mode 100644 test/sqlite/next-db.js create mode 100644 test/sqlite/worker.js diff --git a/src/node_sqlite.cc b/src/node_sqlite.cc index 924ed5789ba364..d9557f004ac80b 100644 --- a/src/node_sqlite.cc +++ b/src/node_sqlite.cc @@ -1647,26 +1647,28 @@ void Backup(const FunctionCallbackInfo& args) { job->ScheduleBackup(); } +struct ConflictCallbackContext { + std::function filterCallback; + std::function conflictCallback; +}; + // the reason for using static functions here is that SQLite needs a // function pointer -static std::function conflictCallback; static int xConflict(void* pCtx, int eConflict, sqlite3_changeset_iter* pIter) { - if (!conflictCallback) return SQLITE_CHANGESET_ABORT; - return conflictCallback(eConflict); + auto ctx = static_cast(pCtx); + if (!ctx->conflictCallback) return SQLITE_CHANGESET_ABORT; + return ctx->conflictCallback(eConflict); } -static std::function filterCallback; - static int xFilter(void* pCtx, const char* zTab) { - if (!filterCallback) return 1; - - return filterCallback(zTab) ? 1 : 0; + auto ctx = static_cast(pCtx); + if (!ctx->filterCallback) return 1; + return ctx->filterCallback(zTab) ? 1 : 0; } void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo& args) { - conflictCallback = nullptr; - filterCallback = nullptr; + ConflictCallbackContext context; DatabaseSync* db; ASSIGN_OR_RETURN_UNWRAP(&db, args.This()); @@ -1702,7 +1704,7 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo& args) { return; } Local conflictFunc = conflictValue.As(); - conflictCallback = [env, conflictFunc](int conflictType) -> int { + context.conflictCallback = [env, conflictFunc](int conflictType) -> int { Local argv[] = {Integer::New(env->isolate(), conflictType)}; TryCatch try_catch(env->isolate()); Local result = @@ -1740,7 +1742,7 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo& args) { Local filterFunc = filterValue.As(); - filterCallback = [env, filterFunc](std::string item) -> bool { + context.filterCallback = [env, filterFunc](std::string item) -> bool { // TODO(@jasnell): The use of ToLocalChecked here means that if // the filter function throws an error the process will crash. // The filterCallback should be updated to avoid the check and @@ -1764,7 +1766,7 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo& args) { const_cast(static_cast(buf.data())), xFilter, xConflict, - nullptr); + static_cast(&context)); if (r == SQLITE_OK) { args.GetReturnValue().Set(true); return; diff --git a/test/parallel/test-sqlite-session.js b/test/parallel/test-sqlite-session.js index 1fe78c6ec6622a..21271cec05996e 100644 --- a/test/parallel/test-sqlite-session.js +++ b/test/parallel/test-sqlite-session.js @@ -7,6 +7,9 @@ const { constants, } = require('node:sqlite'); const { test, suite } = require('node:test'); +const { nextDb } = require('../sqlite/next-db.js'); +const { Worker } = require('worker_threads'); +const { once } = require('events'); /** * Convenience wrapper around assert.deepStrictEqual that sets a null @@ -555,3 +558,74 @@ test('session supports ERM', (t) => { message: /session is not open/, }); }); + +test('concurrent applyChangeset with workers', async (t) => { + // Before adding this test, the callbacks were stored in static variables + // this could result in a crash + // this test is a regression test for that scenario + + function modeToString(mode) { + if (mode === constants.SQLITE_CHANGESET_ABORT) return 'SQLITE_CHANGESET_ABORT'; + if (mode === constants.SQLITE_CHANGESET_OMIT) return 'SQLITE_CHANGESET_OMIT'; + } + + const dbPath = nextDb(); + const db1 = new DatabaseSync(dbPath); + const db2 = new DatabaseSync(':memory:'); + const createTable = ` + CREATE TABLE data( + key INTEGER PRIMARY KEY, + value TEXT + ) STRICT`; + db1.exec(createTable); + db2.exec(createTable); + db1.prepare('INSERT INTO data (key, value) VALUES (?, ?)').run(1, 'hello'); + db1.close(); + const session = db2.createSession(); + db2.prepare('INSERT INTO data (key, value) VALUES (?, ?)').run(1, 'world'); + const changeset = session.changeset(); // Changeset with conflict (for db1) + + const iterations = 10; + for (let i = 0; i < iterations; i++) { + const workers = []; + const expectedResults = new Map([ + [constants.SQLITE_CHANGESET_ABORT, false], + [constants.SQLITE_CHANGESET_OMIT, true]] + ); + + // Launch two workers (abort and omit modes) + for (const mode of [constants.SQLITE_CHANGESET_ABORT, constants.SQLITE_CHANGESET_OMIT]) { + const worker = new Worker(`${__dirname}/../sqlite/worker.js`, { + workerData: { + dbPath, + changeset, + mode + }, + }); + workers.push(worker); + } + + const results = await Promise.all(workers.map(async (worker) => { + const [message] = await once(worker, 'message'); + return message; + })); + + // Verify each result + for (const res of results) { + if (res.errorMessage) { + if (res.errcode === 5) { // SQLITE_BUSY + break; // ignore + } + t.assert.fail(`Worker error: ${res.error.message}`); + } + const expected = expectedResults.get(res.mode); + t.assert.strictEqual( + res.result, + expected, + `Iteration ${i}: Worker (${modeToString(res.mode)}) expected ${expected} but got ${res.result}` + ); + } + + workers.forEach((worker) => worker.terminate()); // Cleanup + } +}); diff --git a/test/parallel/test-sqlite.js b/test/parallel/test-sqlite.js index 717c757c6bfd40..602f2fa2daa46d 100644 --- a/test/parallel/test-sqlite.js +++ b/test/parallel/test-sqlite.js @@ -1,18 +1,10 @@ 'use strict'; const { spawnPromisified, skipIfSQLiteMissing } = require('../common'); skipIfSQLiteMissing(); -const tmpdir = require('../common/tmpdir'); -const { join } = require('node:path'); const { DatabaseSync, constants } = require('node:sqlite'); const { suite, test } = require('node:test'); const { pathToFileURL } = require('node:url'); -let cnt = 0; - -tmpdir.refresh(); - -function nextDb() { - return join(tmpdir.path, `database-${cnt++}.db`); -} +const { nextDb } = require('../sqlite/next-db.js'); suite('accessing the node:sqlite module', () => { test('cannot be accessed without the node: scheme', (t) => { diff --git a/test/sqlite/next-db.js b/test/sqlite/next-db.js new file mode 100644 index 00000000000000..adedc8d7d08c76 --- /dev/null +++ b/test/sqlite/next-db.js @@ -0,0 +1,14 @@ +'use strict'; +require('../../common'); +const tmpdir = require('../common/tmpdir'); +const { join } = require('node:path'); + +let cnt = 0; + +tmpdir.refresh(); + +function nextDb() { + return join(tmpdir.path, `database-${cnt++}.db`); +} + +module.exports = { nextDb }; diff --git a/test/sqlite/worker.js b/test/sqlite/worker.js new file mode 100644 index 00000000000000..524512169ff29e --- /dev/null +++ b/test/sqlite/worker.js @@ -0,0 +1,24 @@ +// This worker is used for one of the tests in test-sqlite-session.js + +'use strict'; +require('../../common'); +const { parentPort, workerData } = require('worker_threads'); +const { DatabaseSync, constants } = require('node:sqlite'); +const { changeset, mode, dbPath } = workerData; + +const db = new DatabaseSync(dbPath); + +const options = {}; +if (mode !== constants.SQLITE_CHANGESET_ABORT && mode !== constants.SQLITE_CHANGESET_OMIT) { + throw new Error('Unexpected value for mode'); +} +options.onConflict = () => mode; + +try { + const result = db.applyChangeset(changeset, options); + parentPort.postMessage({ mode, result, error: null }); +} catch (error) { + parentPort.postMessage({ mode, result: null, errorMessage: error.message, errcode: error.errcode }); +} finally { + db.close(); // Just to make sure it is closed ASAP +} From d19c29359ccb6ccf62f05fa1f4873a055c889442 Mon Sep 17 00:00:00 2001 From: Bart Louwers Date: Thu, 11 Sep 2025 00:03:02 +0200 Subject: [PATCH 2/6] test: fix inclusion of common module --- test/sqlite/next-db.js | 2 +- test/sqlite/worker.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/sqlite/next-db.js b/test/sqlite/next-db.js index adedc8d7d08c76..ae657325362b4c 100644 --- a/test/sqlite/next-db.js +++ b/test/sqlite/next-db.js @@ -1,5 +1,5 @@ 'use strict'; -require('../../common'); +require('../common'); const tmpdir = require('../common/tmpdir'); const { join } = require('node:path'); diff --git a/test/sqlite/worker.js b/test/sqlite/worker.js index 524512169ff29e..8d1ca2420c9332 100644 --- a/test/sqlite/worker.js +++ b/test/sqlite/worker.js @@ -1,7 +1,7 @@ // This worker is used for one of the tests in test-sqlite-session.js 'use strict'; -require('../../common'); +require('../common'); const { parentPort, workerData } = require('worker_threads'); const { DatabaseSync, constants } = require('node:sqlite'); const { changeset, mode, dbPath } = workerData; From 46b18f488dbcf07621850e0b86e41e562a562fbd Mon Sep 17 00:00:00 2001 From: Bart Louwers Date: Thu, 11 Sep 2025 02:00:32 +0200 Subject: [PATCH 3/6] Update src/node_sqlite.cc Co-authored-by: Anna Henningsen --- src/node_sqlite.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/node_sqlite.cc b/src/node_sqlite.cc index d9557f004ac80b..56844c35da7dbf 100644 --- a/src/node_sqlite.cc +++ b/src/node_sqlite.cc @@ -1742,7 +1742,7 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo& args) { Local filterFunc = filterValue.As(); - context.filterCallback = [env, filterFunc](std::string item) -> bool { + context.filterCallback = [env, filterFunc](std::string_view item) -> bool { // TODO(@jasnell): The use of ToLocalChecked here means that if // the filter function throws an error the process will crash. // The filterCallback should be updated to avoid the check and From 6ebd6728d81f42584db20158808fea49253503e6 Mon Sep 17 00:00:00 2001 From: Bart Louwers Date: Thu, 11 Sep 2025 02:00:44 +0200 Subject: [PATCH 4/6] Update src/node_sqlite.cc Co-authored-by: Anna Henningsen --- src/node_sqlite.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/node_sqlite.cc b/src/node_sqlite.cc index 56844c35da7dbf..1a18584725b457 100644 --- a/src/node_sqlite.cc +++ b/src/node_sqlite.cc @@ -1648,7 +1648,7 @@ void Backup(const FunctionCallbackInfo& args) { } struct ConflictCallbackContext { - std::function filterCallback; + std::function filterCallback; std::function conflictCallback; }; From d0dc60aaf35790aad2b92fbe5a28a40ab130ff27 Mon Sep 17 00:00:00 2001 From: Bart Louwers Date: Thu, 11 Sep 2025 02:23:30 +0200 Subject: [PATCH 5/6] sqlite: fix compilation --- src/node_sqlite.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/node_sqlite.cc b/src/node_sqlite.cc index 1a18584725b457..5cbc00662cb794 100644 --- a/src/node_sqlite.cc +++ b/src/node_sqlite.cc @@ -1748,8 +1748,9 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo& args) { // The filterCallback should be updated to avoid the check and // propagate the error correctly. Local argv[] = {String::NewFromUtf8(env->isolate(), - item.c_str(), - NewStringType::kNormal) + item.data(), + NewStringType::kNormal, + static_cast(item.size())) .ToLocalChecked()}; Local result = filterFunc->Call(env->context(), Null(env->isolate()), 1, argv) From 5ae2b741a506040e7ec5f006522a2d8bdd2b212a Mon Sep 17 00:00:00 2001 From: Bart Louwers Date: Thu, 11 Sep 2025 10:55:33 +0200 Subject: [PATCH 6/6] sqlite: run formatter --- src/node_sqlite.cc | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/node_sqlite.cc b/src/node_sqlite.cc index 5cbc00662cb794..c2f9d48982e8ea 100644 --- a/src/node_sqlite.cc +++ b/src/node_sqlite.cc @@ -1742,16 +1742,18 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo& args) { Local filterFunc = filterValue.As(); - context.filterCallback = [env, filterFunc](std::string_view item) -> bool { + context.filterCallback = [env, + filterFunc](std::string_view item) -> bool { // TODO(@jasnell): The use of ToLocalChecked here means that if // the filter function throws an error the process will crash. // The filterCallback should be updated to avoid the check and // propagate the error correctly. - Local argv[] = {String::NewFromUtf8(env->isolate(), - item.data(), - NewStringType::kNormal, - static_cast(item.size())) - .ToLocalChecked()}; + Local argv[] = { + String::NewFromUtf8(env->isolate(), + item.data(), + NewStringType::kNormal, + static_cast(item.size())) + .ToLocalChecked()}; Local result = filterFunc->Call(env->context(), Null(env->isolate()), 1, argv) .ToLocalChecked();