diff --git a/.travis.yml b/.travis.yml index 85d3fdf..7ca556b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,9 @@ language: node_js node_js: - - 8 - 10 - 12 + - 14 before_script: git fetch --tags after_success: npm run coverage diff --git a/example/bulk-insert.js b/example/bulk-insert.js index b8e6c28..b7f4447 100644 --- a/example/bulk-insert.js +++ b/example/bulk-insert.js @@ -1,12 +1,12 @@ -var level = require('..') -var path = require('path') -var db = level(path.join(__dirname, 'data'), { valueEncoding: 'json' }) +const level = require('..') +const path = require('path') +const db = level(path.join(__dirname, 'data'), { valueEncoding: 'json' }) db.on('leader', function () { console.log('i am the leader now') }) -var tick = 0 +let tick = 0 function loop () { db.put('hello-' + tick, { hello: 'world-' + tick }, function () { diff --git a/example/get.js b/example/get.js index 43a1236..769eb5b 100644 --- a/example/get.js +++ b/example/get.js @@ -1,6 +1,6 @@ -var level = require('..') -var path = require('path') -var db = level(path.join(__dirname, 'data'), { valueEncoding: 'json' }) +const level = require('..') +const path = require('path') +const db = level(path.join(__dirname, 'data'), { valueEncoding: 'json' }) db.on('leader', function () { console.log('i am the leader now') diff --git a/example/put.js b/example/put.js index 36502b5..f54c1a7 100644 --- a/example/put.js +++ b/example/put.js @@ -1,12 +1,12 @@ -var level = require('..') -var path = require('path') -var db = level(path.join(__dirname, 'data'), { valueEncoding: 'json' }) +const level = require('..') +const path = require('path') +const db = level(path.join(__dirname, 'data'), { valueEncoding: 'json' }) db.on('leader', function () { console.log('i am the leader now') }) -var n = Math.floor(Math.random() * 100000) +let n = Math.floor(Math.random() * 100000) setInterval(function () { db.put('a', n++, function (err) { diff --git a/example/read-stream.js b/example/read-stream.js index f11139f..02bfc0e 100644 --- a/example/read-stream.js +++ b/example/read-stream.js @@ -1,12 +1,12 @@ -var level = require('..') -var path = require('path') -var db = level(path.join(__dirname, 'data'), { valueEncoding: 'json' }) +const level = require('..') +const path = require('path') +const db = level(path.join(__dirname, 'data'), { valueEncoding: 'json' }) db.on('leader', function () { console.log('i am the leader now') }) -var rs = db.createReadStream() +const rs = db.createReadStream() rs.on('end', function () { console.log('(end)') @@ -14,6 +14,6 @@ rs.on('end', function () { }) setInterval(function () { - var next = rs.read() + const next = rs.read() if (next) console.log(next) }, 250) diff --git a/index.js b/index.js index e1c0994..8f58b58 100644 --- a/index.js +++ b/index.js @@ -1,57 +1,79 @@ 'use strict' -var level = require('level') -var has = require('has') -var pump = require('pump') -var fs = require('fs') -var net = require('net') -var path = require('path') -var multileveldown = require('multileveldown') - -module.exports = function (dir, opts) { - if (!opts) opts = {} - if (!has(opts, 'retry')) opts.retry = true - - var sockPath = process.platform === 'win32' +const level = require('level') +const { pipeline: pump } = require('readable-stream') +const fs = require('fs') +const net = require('net') +const path = require('path') +const multileveldown = require('multileveldown') + +module.exports = function (dir, opts = {}) { + const sockPath = process.platform === 'win32' ? '\\\\.\\pipe\\level-party\\' + path.resolve(dir) : path.join(dir, 'level-party.sock') - var client = multileveldown.client(opts) + opts = { retry: true, ...opts } + + const client = multileveldown.client(opts) client.open(tryConnect) function tryConnect () { - if (!client.isOpen()) return + if (!client.isOpen()) { + return + } - var socket = net.connect(sockPath) - var connected = false + const socket = net.connect(sockPath) + let connected = false socket.on('connect', function () { connected = true }) - // we pass socket as the ref option so we dont hang the event loop + // Pass socket as the ref option so we dont hang the event loop. pump(socket, client.createRpcStream({ ref: socket }), socket, function () { - if (!client.isOpen()) return + // TODO: err? + + if (!client.isOpen()) { + return + } - var db = level(dir, opts, onopen) + const db = level(dir, opts, onopen) function onopen (err) { if (err) { - if (connected) return tryConnect() - return setTimeout(tryConnect, 100) + // TODO: This can cause an invisible retry loop that never completes + // and leads to memory leaks. + // TODO: What errors should be retried? + if (connected) { + tryConnect() + } else { + setTimeout(tryConnect, 100) + } + return } fs.unlink(sockPath, function (err) { - if (err && err.code !== 'ENOENT') return db.emit('error', err) - if (!client.isOpen()) return + if (err && err.code !== 'ENOENT') { + // TODO: Is this how to forward errors? + db.emit('error', err) + return + } + + if (!client.isOpen()) { + return + } + + const sockets = new Set() + const server = net.createServer(function (sock) { + if (sock.unref) { + sock.unref() + } - var sockets = [] - var server = net.createServer(function (sock) { - if (sock.unref) sock.unref() - sockets.push(sock) + sockets.add(sock) pump(sock, multileveldown.server(db), sock, function () { - sockets.splice(sockets.indexOf(sock), 1) + // TODO: err? + sockets.delete(sock) }) }) @@ -60,22 +82,33 @@ module.exports = function (dir, opts) { client.forward(db) server.listen(sockPath, onlistening) + .on('error', function () { + // TODO: Is this how to forward errors? + // TODO: tryConnect()? + db.emit('error', err) + }) function shutdown (cb) { - sockets.forEach(function (sock) { + for (const sock of sockets) { sock.destroy() - }) - server.close(function () { + } + server.close(() => { db.close(cb) }) } function onlistening () { - if (server.unref) server.unref() - if (client.isFlushed()) return - - var sock = net.connect(sockPath) - pump(sock, client.createRpcStream(), sock) + if (server.unref) { + server.unref() + } + if (client.isFlushed()) { + return + } + + const sock = net.connect(sockPath) + pump(sock, client.createRpcStream(), sock, function () { + // TODO: err? + }) client.once('flush', function () { sock.destroy() }) @@ -83,7 +116,7 @@ module.exports = function (dir, opts) { }) } }) - }; + } return client } diff --git a/package.json b/package.json index 1e22de7..239cb85 100644 --- a/package.json +++ b/package.json @@ -17,10 +17,9 @@ "prepublishOnly": "npm run dependency-check" }, "dependencies": { - "has": "^1.0.0", "level": "^6.0.0", "multileveldown": "^3.0.0", - "pump": "^3.0.0" + "readable-stream": "^3.6.0" }, "devDependencies": { "bytewise": "^1.1.0", @@ -28,9 +27,9 @@ "dependency-check": "^4.1.0", "hallmark": "^3.1.0", "level-community": "^3.0.0", - "nyc": "^14.1.1", + "nyc": "^15.1.0", "osenv": "~0.1.0", - "standard": "^14.3.1", + "standard": "^16.0.3", "subleveldown": "^5.0.1", "tape": "^5.0.1" }, diff --git a/test/bytewise.js b/test/bytewise.js index a0d7351..96de565 100644 --- a/test/bytewise.js +++ b/test/bytewise.js @@ -1,17 +1,17 @@ -var test = require('tape') -var level = require('..') -var path = require('path') -var bytewise = require('bytewise') -var tmpdir = require('osenv').tmpdir() -var datadir = path.join(tmpdir, 'level-party-' + Math.random()) +const test = require('tape') +const level = require('..') +const path = require('path') +const bytewise = require('bytewise') +const tmpdir = require('osenv').tmpdir() +const datadir = path.join(tmpdir, 'level-party-' + Math.random()) -var lopts = { keyEncoding: bytewise, valueEncoding: 'json' } +const lopts = { keyEncoding: bytewise, valueEncoding: 'json' } test('bytewise key encoding', function (t) { t.plan(7) - var adb = level(datadir, lopts) - var bdb = level(datadir, lopts) - var value = Math.floor(Math.random() * 100000) + const adb = level(datadir, lopts) + const bdb = level(datadir, lopts) + const value = Math.floor(Math.random() * 100000) adb.put(['a'], value, function (err) { t.ifError(err) diff --git a/test/election.js b/test/election.js index 77ba3a3..c98f307 100644 --- a/test/election.js +++ b/test/election.js @@ -1,35 +1,35 @@ -var test = require('tape') -var level = require('..') -var path = require('path') -var tmpdir = require('osenv').tmpdir() -var datadir = path.join(tmpdir, 'level-party-' + Math.random()) +const test = require('tape') +const level = require('..') +const path = require('path') +const tmpdir = require('osenv').tmpdir() +const datadir = path.join(tmpdir, 'level-party-' + Math.random()) test('failover election party', function (t) { - var keys = ['a', 'b', 'c', 'e', 'f', 'g'] - var len = keys.length + const keys = ['a', 'b', 'c', 'e', 'f', 'g'] + const len = keys.length t.plan(len * (len + 1) / 2) - var pending = keys.length - var handles = {} + let pending = keys.length + const handles = {} keys.forEach(function (key) { - var h = open(key) + const h = open(key) h.on('open', function () { if (--pending === 0) spinDown() }) }) function open (key) { - var h = handles[key] = level(datadir, { valueEncoding: 'json' }) + const h = handles[key] = level(datadir, { valueEncoding: 'json' }) return h } function spinDown () { - var alive = keys.slice(); + const alive = keys.slice(); (function next () { if (alive.length === 0) return check(alive, function () { - var key = alive.shift() + const key = alive.shift() handles[key].close() next() }) @@ -37,11 +37,11 @@ test('failover election party', function (t) { } function check (keys, cb) { - var pending = keys.length + let pending = keys.length if (pending === 0) return cb() - for (var i = 0; i < keys.length; i++) { + for (let i = 0; i < keys.length; i++) { (function (a, b) { - var value = Math.random() + const value = Math.random() handles[a].put(a, value, function (err) { if (err) t.fail(err) handles[b].get(a, function (err, x) { diff --git a/test/handle.js b/test/handle.js index a83e000..d73d65e 100644 --- a/test/handle.js +++ b/test/handle.js @@ -1,15 +1,15 @@ -var test = require('tape') -var level = require('..') -var path = require('path') -var tmpdir = require('osenv').tmpdir() -var datadir = path.join(tmpdir, 'level-party-' + Math.random()) +const test = require('tape') +const level = require('..') +const path = require('path') +const tmpdir = require('osenv').tmpdir() +const datadir = path.join(tmpdir, 'level-party-' + Math.random()) test('two handles', function (t) { t.plan(1) - var adb = level(datadir, { valueEncoding: 'json' }) - var bdb = level(datadir, { valueEncoding: 'json' }) - var value = Math.floor(Math.random() * 100000) + const adb = level(datadir, { valueEncoding: 'json' }) + const bdb = level(datadir, { valueEncoding: 'json' }) + const value = Math.floor(Math.random() * 100000) adb.put('a', value, function (err) { if (err) t.fail(err) diff --git a/test/subleveldown.js b/test/subleveldown.js index 1de083b..dc729b1 100644 --- a/test/subleveldown.js +++ b/test/subleveldown.js @@ -1,18 +1,18 @@ -var test = require('tape') -var level = require('..') -var path = require('path') -var sub = require('subleveldown') -var tmpdir = require('osenv').tmpdir() -var datadir = path.join(tmpdir, 'level-party-' + Math.random()) +const test = require('tape') +const level = require('..') +const path = require('path') +const sub = require('subleveldown') +const tmpdir = require('osenv').tmpdir() +const datadir = path.join(tmpdir, 'level-party-' + Math.random()) test('subleveldown on level-party', function (t) { t.plan(9) - var a = level(datadir) - var b = level(datadir) - var asub = sub(a, 'test', { valueEncoding: 'json' }) - var bsub = sub(b, 'test') - var obj = { test: Math.floor(Math.random() * 100000) } + const a = level(datadir) + const b = level(datadir) + const asub = sub(a, 'test', { valueEncoding: 'json' }) + const bsub = sub(b, 'test') + const obj = { test: Math.floor(Math.random() * 100000) } asub.put('a', obj, function (err) { t.ifError(err)