Skip to content
This repository was archived by the owner on Dec 2, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions client.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
var levelup = require('levelup')
var encode = require('encoding-down')
var leveldown = require('./leveldown')
const levelup = require('levelup')
const encode = require('encoding-down')
const leveldown = require('./leveldown')

module.exports = function (opts) {
if (!opts) opts = {}

var down = leveldown(Object.assign({}, opts, { onflush: onflush }))
var db = levelup(encode(down, opts), opts)
const down = leveldown(Object.assign({}, opts, { onflush: onflush }))
const db = levelup(encode(down, opts), opts)

db.createRpcStream = db.connect = connect
db.isFlushed = isFlushed
Expand Down
77 changes: 39 additions & 38 deletions leveldown.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
var duplexify = require('duplexify')
var abstract = require('abstract-leveldown')
var util = require('util')
var eos = require('end-of-stream')
var ids = require('numeric-id-map')
var lpstream = require('length-prefixed-stream')
var reachdown = require('reachdown')
var messages = require('./messages')
var matchdown = require('./matchdown')

var ENCODERS = [
const duplexify = require('duplexify')
const abstract = require('abstract-leveldown')
const util = require('util')
const eos = require('end-of-stream')
const ids = require('numeric-id-map')
const lpstream = require('length-prefixed-stream')
const reachdown = require('reachdown')
const messages = require('./messages')
const matchdown = require('./matchdown')

const ENCODERS = [
messages.Get,
messages.Put,
messages.Delete,
messages.Batch,
messages.Iterator
]

var DECODERS = [
const DECODERS = [
messages.Callback,
messages.IteratorData
]
Expand Down Expand Up @@ -47,18 +47,19 @@ Multilevel.prototype.createRpcStream = function (opts, proxy) {
if (!opts) opts = {}
this._ref = opts.ref || null

var self = this
var encode = this._encode
var decode = lpstream.decode()
const self = this
const encode = this._encode
const decode = lpstream.decode()

decode.on('data', function (data) {
if (!data.length) return
var tag = data[0]
const tag = data[0]
if (tag >= DECODERS.length) return

var dec = DECODERS[tag]
const dec = DECODERS[tag]
let res
try {
var res = dec.decode(data, 1)
res = dec.decode(data, 1)
} catch (err) {
return
}
Expand Down Expand Up @@ -93,29 +94,29 @@ Multilevel.prototype.createRpcStream = function (opts, proxy) {
return
}

for (var i = 0; i < self._requests.length; i++) {
var req = self._requests.get(i)
for (let i = 0; i < self._requests.length; i++) {
const req = self._requests.get(i)
if (!req) continue
self._write(req)
}

for (var j = 0; j < self._iterators.length; j++) {
var ite = self._iterators.get(j)
for (let j = 0; j < self._iterators.length; j++) {
const ite = self._iterators.get(j)
if (!ite) continue
ite.options = ite.iterator._options
self._write(ite)
}
}

function oniteratordata (res) {
var req = self._iterators.get(res.id)
const req = self._iterators.get(res.id)
if (!req) return
req.pending.push(res)
if (req.callback) req.iterator.next(req.callback)
}

function oncallback (res) {
var req = self._requests.remove(res.id)
const req = self._requests.remove(res.id)
if (req) req.callback(decodeError(res.error), decodeValue(res.value, req.valueAsBuffer))
}
}
Expand All @@ -135,13 +136,13 @@ Multilevel.prototype._flushMaybe = function () {
}

Multilevel.prototype._clearRequests = function (closing) {
for (var i = 0; i < this._requests.length; i++) {
var req = this._requests.remove(i)
for (let i = 0; i < this._requests.length; i++) {
const req = this._requests.remove(i)
if (req) req.callback(new Error('Connection to leader lost'))
}

for (var j = 0; j < this._iterators.length; j++) {
var ite = this._iterators.remove(j)
for (let j = 0; j < this._iterators.length; j++) {
const ite = this._iterators.remove(j)
if (ite) {
if (ite.callback && !closing) ite.callback(new Error('Connection to leader lost'))
ite.iterator.end()
Expand All @@ -160,7 +161,7 @@ Multilevel.prototype._serializeValue = function (value) {
Multilevel.prototype._get = function (key, opts, cb) {
if (this._db) return this._db._get(key, opts, cb)

var req = {
const req = {
tag: 0,
id: 0,
key: key,
Expand All @@ -175,7 +176,7 @@ Multilevel.prototype._get = function (key, opts, cb) {
Multilevel.prototype._put = function (key, value, opts, cb) {
if (this._db) return this._db._put(key, value, opts, cb)

var req = {
const req = {
tag: 1,
id: 0,
key: key,
Expand All @@ -190,7 +191,7 @@ Multilevel.prototype._put = function (key, value, opts, cb) {
Multilevel.prototype._del = function (key, opts, cb) {
if (this._db) return this._db._del(key, opts, cb)

var req = {
const req = {
tag: 2,
id: 0,
key: key,
Expand All @@ -204,7 +205,7 @@ Multilevel.prototype._del = function (key, opts, cb) {
Multilevel.prototype._batch = function (batch, opts, cb) {
if (this._db) return this._db._batch(batch, opts, cb)

var req = {
const req = {
tag: 3,
id: 0,
ops: batch,
Expand All @@ -217,8 +218,8 @@ Multilevel.prototype._batch = function (batch, opts, cb) {

Multilevel.prototype._write = function (req) {
if (this._requests.length + this._iterators.length === 1) ref(this._ref)
var enc = ENCODERS[req.tag]
var buf = Buffer.allocUnsafe(enc.encodingLength(req) + 1)
const enc = ENCODERS[req.tag]
const buf = Buffer.allocUnsafe(enc.encodingLength(req) + 1)
buf[0] = req.tag
enc.encode(req, buf, 1)
this._encode.write(buf)
Expand Down Expand Up @@ -250,7 +251,7 @@ function Iterator (parent, opts) {
this._valueAsBuffer = opts.valueAsBuffer
this._options = opts

var req = {
const req = {
tag: 4,
id: 0,
batch: 32,
Expand Down Expand Up @@ -279,16 +280,16 @@ Iterator.prototype.next = function (cb) {
this._parent._write(this._req)
}

var next = this._req.pending.shift()
const next = this._req.pending.shift()
if (next.error) return cb(decodeError(next.error))

if (!next.key && !next.value) return cb()

this._options.gt = next.key
if (this._options.limit > 0) this._options.limit--

var key = decodeValue(next.key, this._keyAsBuffer)
var val = decodeValue(next.value, this._valueAsBuffer)
const key = decodeValue(next.key, this._keyAsBuffer)
const val = decodeValue(next.value, this._valueAsBuffer)
return cb(undefined, key, val)
}

Expand Down
11 changes: 8 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
"level-community": "^3.0.0",
"level-compose": "^0.0.2",
"memdown": "^5.0.0",
"nyc": "^14.1.1",
"nyc": "^15.1.0",
"protocol-buffers": "^4.0.2",
"standard": "^14.1.0",
"standard": "^16.0.3",
"subleveldown": "^5.0.1",
"tape": "^5.0.1"
},
Expand All @@ -52,6 +52,11 @@
},
"homepage": "https://github.com/Level/multileveldown",
"engines": {
"node": ">=8"
"node": ">=10"
},
"standard": {
"ignore": [
"messages.js"
]
}
}
59 changes: 30 additions & 29 deletions server.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
var lpstream = require('length-prefixed-stream')
var eos = require('end-of-stream')
var duplexify = require('duplexify')
var reachdown = require('reachdown')
var messages = require('./messages')
var rangeOptions = 'gt gte lt lte'.split(' ')
var matchdown = require('./matchdown')

var DECODERS = [
const lpstream = require('length-prefixed-stream')
const eos = require('end-of-stream')
const duplexify = require('duplexify')
const reachdown = require('reachdown')
const messages = require('./messages')
const rangeOptions = 'gt gte lt lte'.split(' ')
const matchdown = require('./matchdown')

const DECODERS = [
messages.Get,
messages.Put,
messages.Delete,
Expand All @@ -17,39 +17,40 @@ var DECODERS = [
module.exports = function (db, opts) {
if (!opts) opts = {}

var readonly = !!(opts.readonly)
var decode = lpstream.decode()
var encode = lpstream.encode()
var stream = duplexify(decode, encode)
const readonly = !!(opts.readonly)
const decode = lpstream.decode()
const encode = lpstream.encode()
const stream = duplexify(decode, encode)

var preput = opts.preput || function (key, val, cb) { cb(null) }
var predel = opts.predel || function (key, cb) { cb(null) }
var prebatch = opts.prebatch || function (ops, cb) { cb(null) }
const preput = opts.preput || function (key, val, cb) { cb(null) }
const predel = opts.predel || function (key, cb) { cb(null) }
const prebatch = opts.prebatch || function (ops, cb) { cb(null) }

if (db.isOpen()) ready()
else db.open(ready)

return stream

function ready () {
var down = reachdown(db, matchdown, false)
var iterators = []
const down = reachdown(db, matchdown, false)
const iterators = []

eos(stream, function () {
while (iterators.length) {
var next = iterators.shift()
const next = iterators.shift()
if (next) next.end()
}
})

decode.on('data', function (data) {
if (!data.length) return
var tag = data[0]
const tag = data[0]
if (tag >= DECODERS.length) return

var dec = DECODERS[tag]
const dec = DECODERS[tag]
let req
try {
var req = dec.decode(data, 1)
req = dec.decode(data, 1)
} catch (err) {
return
}
Expand All @@ -74,8 +75,8 @@ module.exports = function (db, opts) {
})

function callback (id, err, value) {
var msg = { id: id, error: err && err.message, value: value }
var buf = Buffer.allocUnsafe(messages.Callback.encodingLength(msg) + 1)
const msg = { id: id, error: err && err.message, value: value }
const buf = Buffer.allocUnsafe(messages.Callback.encodingLength(msg) + 1)
buf[0] = 0
messages.Callback.encode(msg, buf, 1)
encode.write(buf)
Expand Down Expand Up @@ -122,7 +123,7 @@ module.exports = function (db, opts) {
function oniterator (req) {
while (iterators.length < req.id) iterators.push(null)

var prev = iterators[req.id]
let prev = iterators[req.id]
if (!prev) prev = iterators[req.id] = new Iterator(down, req, encode)

if (!req.batch) {
Expand All @@ -137,7 +138,7 @@ module.exports = function (db, opts) {
}

function Iterator (down, req, encode) {
var self = this
const self = this

this.batch = req.batch || 0
this._iterator = down.iterator(cleanRangeOptions(req.options))
Expand All @@ -159,7 +160,7 @@ function Iterator (down, req, encode) {
self._data.key = key
self._data.value = value
self.batch--
var buf = Buffer.allocUnsafe(messages.IteratorData.encodingLength(self._data) + 1)
const buf = Buffer.allocUnsafe(messages.IteratorData.encodingLength(self._data) + 1)
buf[0] = 1
messages.IteratorData.encode(self._data, buf, 1)
encode.write(buf)
Expand All @@ -185,9 +186,9 @@ function noop () {}
function cleanRangeOptions (options) {
if (!options) return

var result = {}
const result = {}

for (var k in options) {
for (const k in options) {
if (!hasOwnProperty.call(options, k)) continue

if (!isRangeOption(k) || options[k] != null) {
Expand Down
6 changes: 3 additions & 3 deletions streams.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
var pbs = require('pbs')
var fs = require('fs')
var path = require('path')
const pbs = require('pbs')
const fs = require('fs')
const path = require('path')

module.exports = pbs(fs.readFileSync(path.join(__dirname, 'schema.proto')))
20 changes: 10 additions & 10 deletions test/abstract.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
var test = require('tape')
var memdown = require('memdown')
var levelup = require('levelup')
var encode = require('encoding-down')
var factory = require('level-compose')(memdown, encode, levelup)
var multileveldown = require('..')
var suite = require('abstract-leveldown/test')
const test = require('tape')
const memdown = require('memdown')
const levelup = require('levelup')
const encode = require('encoding-down')
const factory = require('level-compose')(memdown, encode, levelup)
const multileveldown = require('..')
const suite = require('abstract-leveldown/test')

suite({
test: test,
factory: function () {
var db = factory()
var stream = multileveldown.server(db)
var client = multileveldown.client()
const db = factory()
const stream = multileveldown.server(db)
const client = multileveldown.client()

stream.pipe(client.createRpcStream()).pipe(stream)

Expand Down
Loading