Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 44 additions & 9 deletions postgres_db.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,25 @@ exports.database.prototype.init = function(callback)
'"value" text NOT NULL, ' +
'CONSTRAINT store_pkey PRIMARY KEY (key))';

var createFunc = "CREATE OR REPLACE FUNCTION ueberdb_insert_or_update(character varying, text) " +
var _this = this;

// this variable will be given a value depending on the result of the
// feature detection
_this.upsertStatement = null;

/*
* - Detects if this Postgres version supports INSERT .. ON CONFLICT
* UPDATE (PostgreSQL >= 9.5 and CockroachDB)
* - If upsert is not supported natively, creates in the DB a pl/pgsql
* function that emulates it
* - Performs a side effect, setting _this.upsertStatement to the sql
* statement that needs to be used, based on the detection result
* - calls the callback
*/
function detectUpsertMethod(callback) {
var upsertViaFunction = "SELECT ueberdb_insert_or_update($1,$2)";
var upsertNatively = "INSERT INTO store(key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = excluded.value";
var createFunc = "CREATE OR REPLACE FUNCTION ueberdb_insert_or_update(character varying, text) " +
"RETURNS void AS $$ " +
"BEGIN " +
" IF EXISTS( SELECT * FROM store WHERE key = $1 ) THEN " +
Expand All @@ -50,16 +68,33 @@ exports.database.prototype.init = function(callback)
" RETURN; " +
"END; " +
"$$ LANGUAGE plpgsql;";

this.db.query(createFunc, []);

var _this = this;

var testNativeUpsert = "EXPLAIN " + upsertNatively;

_this.db.query(testNativeUpsert, ["test-key", "test-value"], function(err, results) {
if (err) {
// the UPSERT statement failed: we will have to emulate it via
// an sql function
_this.upsertStatement = upsertViaFunction;

// actually create the emulation function
_this.db.query(createFunc, [], callback);

return;
}

// if we get here, the EXPLAIN UPSERT succeeded, and we can use a
// native UPSERT
_this.upsertStatement = upsertNatively;
callback();
});
}

this.db.query(testTableExists, function(err, result) {
if (result.rows.length == 0) {
_this.db.query(createTable, callback);
_this.db.query(createTable, detectUpsertMethod(callback));
} else {
callback();
detectUpsertMethod(callback);
}
});
}
Expand Down Expand Up @@ -117,7 +152,7 @@ exports.database.prototype.set = function (key, value, callback)
}
else
{
this.db.query("SELECT ueberdb_insert_or_update($1,$2)", [key,value], callback);
this.db.query(_this.upsertStatement, [key,value], callback);
}
}

Expand Down Expand Up @@ -160,7 +195,7 @@ exports.database.prototype.doBulk = function (bulk, callback)
{
if (!replaceVALs.length < 1) {
for (var v in replaceVALs) {
_this.db.query("SELECT ueberdb_insert_or_update($1,$2)", replaceVALs[v], callback);
_this.db.query(_this.upsertStatement, replaceVALs[v], callback);
}
} else {
callback();
Expand Down