Skip to content
This repository was archived by the owner on Dec 2, 2024. It is now read-only.
/ party Public archive
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
language: node_js

node_js:
- 8
- 10
- 12
- 14

before_script: git fetch --tags
after_success: npm run coverage
Expand Down
8 changes: 4 additions & 4 deletions example/bulk-insert.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
var level = require('..')
var path = require('path')
var db = level(path.join(__dirname, 'data'), { valueEncoding: 'json' })
const level = require('..')
const path = require('path')
const db = level(path.join(__dirname, 'data'), { valueEncoding: 'json' })

db.on('leader', function () {
console.log('i am the leader now')
})

var tick = 0
let tick = 0

function loop () {
db.put('hello-' + tick, { hello: 'world-' + tick }, function () {
Expand Down
6 changes: 3 additions & 3 deletions example/get.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
var level = require('..')
var path = require('path')
var db = level(path.join(__dirname, 'data'), { valueEncoding: 'json' })
const level = require('..')
const path = require('path')
const db = level(path.join(__dirname, 'data'), { valueEncoding: 'json' })

db.on('leader', function () {
console.log('i am the leader now')
Expand Down
8 changes: 4 additions & 4 deletions example/put.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
var level = require('..')
var path = require('path')
var db = level(path.join(__dirname, 'data'), { valueEncoding: 'json' })
const level = require('..')
const path = require('path')
const db = level(path.join(__dirname, 'data'), { valueEncoding: 'json' })

db.on('leader', function () {
console.log('i am the leader now')
})

var n = Math.floor(Math.random() * 100000)
let n = Math.floor(Math.random() * 100000)

setInterval(function () {
db.put('a', n++, function (err) {
Expand Down
10 changes: 5 additions & 5 deletions example/read-stream.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
var level = require('..')
var path = require('path')
var db = level(path.join(__dirname, 'data'), { valueEncoding: 'json' })
const level = require('..')
const path = require('path')
const db = level(path.join(__dirname, 'data'), { valueEncoding: 'json' })

db.on('leader', function () {
console.log('i am the leader now')
})

var rs = db.createReadStream()
const rs = db.createReadStream()

rs.on('end', function () {
console.log('(end)')
process.exit()
})

setInterval(function () {
var next = rs.read()
const next = rs.read()
if (next) console.log(next)
}, 250)
109 changes: 71 additions & 38 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,57 +1,79 @@
'use strict'

var level = require('level')
var has = require('has')
Comment thread
ronag marked this conversation as resolved.
var pump = require('pump')
var fs = require('fs')
var net = require('net')
var path = require('path')
var multileveldown = require('multileveldown')

module.exports = function (dir, opts) {
if (!opts) opts = {}
if (!has(opts, 'retry')) opts.retry = true

var sockPath = process.platform === 'win32'
const level = require('level')
const { pipeline: pump } = require('readable-stream')
const fs = require('fs')
const net = require('net')
const path = require('path')
const multileveldown = require('multileveldown')

module.exports = function (dir, opts = {}) {
const sockPath = process.platform === 'win32'
? '\\\\.\\pipe\\level-party\\' + path.resolve(dir)
: path.join(dir, 'level-party.sock')

var client = multileveldown.client(opts)
opts = { retry: true, ...opts }

const client = multileveldown.client(opts)

client.open(tryConnect)

function tryConnect () {
if (!client.isOpen()) return
if (!client.isOpen()) {
return
}

var socket = net.connect(sockPath)
var connected = false
const socket = net.connect(sockPath)
let connected = false

socket.on('connect', function () {
connected = true
})

// we pass socket as the ref option so we dont hang the event loop
// Pass socket as the ref option so we dont hang the event loop.
pump(socket, client.createRpcStream({ ref: socket }), socket, function () {
if (!client.isOpen()) return
// TODO: err?

if (!client.isOpen()) {
return
}

var db = level(dir, opts, onopen)
const db = level(dir, opts, onopen)

function onopen (err) {
if (err) {
if (connected) return tryConnect()
return setTimeout(tryConnect, 100)
// TODO: This can cause an invisible retry loop that never completes
// and leads to memory leaks.
// TODO: What errors should be retried?
if (connected) {
tryConnect()
} else {
setTimeout(tryConnect, 100)
}
Comment thread
ronag marked this conversation as resolved.
return
}

fs.unlink(sockPath, function (err) {
if (err && err.code !== 'ENOENT') return db.emit('error', err)
if (!client.isOpen()) return
if (err && err.code !== 'ENOENT') {
// TODO: Is this how to forward errors?
db.emit('error', err)
return
}

if (!client.isOpen()) {
return
}

const sockets = new Set()
const server = net.createServer(function (sock) {
if (sock.unref) {
sock.unref()
}

var sockets = []
var server = net.createServer(function (sock) {
if (sock.unref) sock.unref()
sockets.push(sock)
sockets.add(sock)
pump(sock, multileveldown.server(db), sock, function () {
sockets.splice(sockets.indexOf(sock), 1)
// TODO: err?
sockets.delete(sock)
})
})

Expand All @@ -60,30 +82,41 @@ module.exports = function (dir, opts) {
client.forward(db)

server.listen(sockPath, onlistening)
.on('error', function () {
// TODO: Is this how to forward errors?
// TODO: tryConnect()?
db.emit('error', err)
})

function shutdown (cb) {
sockets.forEach(function (sock) {
for (const sock of sockets) {
sock.destroy()
})
server.close(function () {
}
server.close(() => {
db.close(cb)
})
}

function onlistening () {
if (server.unref) server.unref()
if (client.isFlushed()) return

var sock = net.connect(sockPath)
pump(sock, client.createRpcStream(), sock)
if (server.unref) {
server.unref()
}
if (client.isFlushed()) {
return
}

const sock = net.connect(sockPath)
pump(sock, client.createRpcStream(), sock, function () {
// TODO: err?
})
client.once('flush', function () {
sock.destroy()
})
}
})
}
})
};
}

return client
}
7 changes: 3 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,19 @@
"prepublishOnly": "npm run dependency-check"
},
"dependencies": {
"has": "^1.0.0",
"level": "^6.0.0",
"multileveldown": "^3.0.0",
"pump": "^3.0.0"
"readable-stream": "^3.6.0"
},
"devDependencies": {
"bytewise": "^1.1.0",
"coveralls": "^3.0.9",
"dependency-check": "^4.1.0",
"hallmark": "^3.1.0",
"level-community": "^3.0.0",
"nyc": "^14.1.1",
"nyc": "^15.1.0",
"osenv": "~0.1.0",
"standard": "^14.3.1",
"standard": "^16.0.3",
"subleveldown": "^5.0.1",
"tape": "^5.0.1"
},
Expand Down
20 changes: 10 additions & 10 deletions test/bytewise.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
var test = require('tape')
var level = require('..')
var path = require('path')
var bytewise = require('bytewise')
var tmpdir = require('osenv').tmpdir()
var datadir = path.join(tmpdir, 'level-party-' + Math.random())
const test = require('tape')
const level = require('..')
const path = require('path')
const bytewise = require('bytewise')
const tmpdir = require('osenv').tmpdir()
const datadir = path.join(tmpdir, 'level-party-' + Math.random())

var lopts = { keyEncoding: bytewise, valueEncoding: 'json' }
const lopts = { keyEncoding: bytewise, valueEncoding: 'json' }

test('bytewise key encoding', function (t) {
t.plan(7)
var adb = level(datadir, lopts)
var bdb = level(datadir, lopts)
var value = Math.floor(Math.random() * 100000)
const adb = level(datadir, lopts)
const bdb = level(datadir, lopts)
const value = Math.floor(Math.random() * 100000)

adb.put(['a'], value, function (err) {
t.ifError(err)
Expand Down
32 changes: 16 additions & 16 deletions test/election.js
Original file line number Diff line number Diff line change
@@ -1,47 +1,47 @@
var test = require('tape')
var level = require('..')
var path = require('path')
var tmpdir = require('osenv').tmpdir()
var datadir = path.join(tmpdir, 'level-party-' + Math.random())
const test = require('tape')
const level = require('..')
const path = require('path')
const tmpdir = require('osenv').tmpdir()
const datadir = path.join(tmpdir, 'level-party-' + Math.random())

test('failover election party', function (t) {
var keys = ['a', 'b', 'c', 'e', 'f', 'g']
var len = keys.length
const keys = ['a', 'b', 'c', 'e', 'f', 'g']
const len = keys.length
t.plan(len * (len + 1) / 2)
var pending = keys.length
var handles = {}
let pending = keys.length
const handles = {}

keys.forEach(function (key) {
var h = open(key)
const h = open(key)
h.on('open', function () {
if (--pending === 0) spinDown()
})
})

function open (key) {
var h = handles[key] = level(datadir, { valueEncoding: 'json' })
const h = handles[key] = level(datadir, { valueEncoding: 'json' })
return h
}

function spinDown () {
var alive = keys.slice();
const alive = keys.slice();
(function next () {
if (alive.length === 0) return

check(alive, function () {
var key = alive.shift()
const key = alive.shift()
handles[key].close()
next()
})
})()
}

function check (keys, cb) {
var pending = keys.length
let pending = keys.length
if (pending === 0) return cb()
for (var i = 0; i < keys.length; i++) {
for (let i = 0; i < keys.length; i++) {
(function (a, b) {
var value = Math.random()
const value = Math.random()
handles[a].put(a, value, function (err) {
if (err) t.fail(err)
handles[b].get(a, function (err, x) {
Expand Down
16 changes: 8 additions & 8 deletions test/handle.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
var test = require('tape')
var level = require('..')
var path = require('path')
var tmpdir = require('osenv').tmpdir()
var datadir = path.join(tmpdir, 'level-party-' + Math.random())
const test = require('tape')
const level = require('..')
const path = require('path')
const tmpdir = require('osenv').tmpdir()
const datadir = path.join(tmpdir, 'level-party-' + Math.random())

test('two handles', function (t) {
t.plan(1)

var adb = level(datadir, { valueEncoding: 'json' })
var bdb = level(datadir, { valueEncoding: 'json' })
var value = Math.floor(Math.random() * 100000)
const adb = level(datadir, { valueEncoding: 'json' })
const bdb = level(datadir, { valueEncoding: 'json' })
const value = Math.floor(Math.random() * 100000)

adb.put('a', value, function (err) {
if (err) t.fail(err)
Expand Down
Loading