diff --git a/src/createEngineStream.ts b/src/createEngineStream.ts index 01b0b29..7fddfe1 100644 --- a/src/createEngineStream.ts +++ b/src/createEngineStream.ts @@ -21,9 +21,9 @@ export default function createEngineStream(opts: EngineStreamOptions): Duplex { const stream = new Duplex({ objectMode: true, read, write }); // forward notifications if (engine.on) { - engine.on('notification', (message) => { - stream.push(message); - }); + const onNotification = (message: unknown) => stream.push(message); + engine.on('notification', onNotification); + stream.on('close', () => engine.off('notification', onNotification)); } return stream; diff --git a/test/index.js b/test/index.js index 2be7975..1a98d25 100644 --- a/test/index.js +++ b/test/index.js @@ -3,7 +3,6 @@ const { JsonRpcEngine } = require('json-rpc-engine'); const { createStreamMiddleware, createEngineStream } = require('../dist'); test('middleware - raw test', (t) => { - const jsonRpcConnection = createStreamMiddleware(); const req = { id: 1, jsonrpc: '2.0', method: 'test' }; const initRes = { id: 1, jsonrpc: '2.0' }; @@ -16,17 +15,21 @@ test('middleware - raw test', (t) => { }); // run middleware, expect end fn to be called - jsonRpcConnection.middleware(req, initRes, () => { - t.fail('should not call next'); - }, (err) => { - t.notOk(err, 'should not error'); - t.deepEqual(initRes, res, 'got the expected response'); - t.end(); - }); + jsonRpcConnection.middleware( + req, + initRes, + () => { + t.fail('should not call next'); + }, + (err) => { + t.notOk(err, 'should not error'); + t.deepEqual(initRes, res, 'got the expected response'); + t.end(); + }, + ); }); test('engine to stream - raw test', (t) => { - const engine = new JsonRpcEngine(); engine.push((_req, res, _next, end) => { res.result = 'test'; @@ -51,7 +54,6 @@ test('engine to stream - raw test', (t) => { }); test('middleware and engine to stream', (t) => { - // create guest const engineA = new JsonRpcEngine(); const jsonRpcConnection = createStreamMiddleware(); @@ -67,9 +69,7 @@ test('middleware and engine to stream', (t) => { // connect both const clientSideStream = jsonRpcConnection.stream; const hostSideStream = createEngineStream({ engine: engineB }); - clientSideStream - .pipe(hostSideStream) - .pipe(clientSideStream); + clientSideStream.pipe(hostSideStream).pipe(clientSideStream); // request and expected result const req = { id: 1, jsonrpc: '2.0', method: 'test' }; @@ -115,3 +115,14 @@ test('server notification in stream', (t) => { engine.emit('notification', notif); }); + +test('clean up listener', (t) => { + const n = 10; + const engine = new JsonRpcEngine(); + for (let i = 0; i < n; i++) { + const stream = createEngineStream({ engine }); + stream.end(); + } + t.equal(engine.listenerCount(), 0); + t.end(); +});