diff --git a/lib/application.js b/lib/application.js index 57b701cef..765c86771 100644 --- a/lib/application.js +++ b/lib/application.js @@ -253,7 +253,34 @@ function respond(ctx) { // responses if (Buffer.isBuffer(body)) return res.end(body); if ('string' == typeof body) return res.end(body); - if (body instanceof Stream) return body.pipe(res); + if (body instanceof Stream) { + // check if it's an objectMode stream + // readableObjectMode is available since Node 12.3 + if (body.readableObjectMode || + (body._readableState && + body._readableState.objectMode)) { + let first = true; + res.write('['); + body = body + .pipe(new Stream.Transform({ + writableObjectMode: true, + readableObjectMode: false, + transform(data, encoding, callback) { + if (!first) this.push(','); + else first = false; + + this.push(JSON.stringify(data)); + return callback(); + }, + flush(callback) { + this.push(']'); + this.push(null); + return callback(); + } + })); + } + return body.pipe(res); + } // body: json body = JSON.stringify(body); diff --git a/lib/response.js b/lib/response.js index 87ecc56a9..28e471bc0 100644 --- a/lib/response.js +++ b/lib/response.js @@ -166,14 +166,21 @@ module.exports = { } // stream - if ('function' == typeof val.pipe) { + if (val instanceof Stream) { onFinish(this.res, destroy.bind(null, val)); ensureErrorHandler(val, err => this.ctx.onerror(err)); // overwriting if (null != original && original != val) this.remove('Content-Length'); - if (setType) this.type = 'bin'; + if (setType) { + // check if it's an objectMode stream + // readableObjectMode is available since Node 12.3 + this.type = (val.readableObjectMode || + (val._readableState && + val._readableState.objectMode)) + ? 'json' : 'bin'; + } return; } diff --git a/test/application/respond.js b/test/application/respond.js index d254ead6e..e2136d96a 100644 --- a/test/application/respond.js +++ b/test/application/respond.js @@ -6,6 +6,7 @@ const statuses = require('statuses'); const assert = require('assert'); const Koa = require('../..'); const fs = require('fs'); +const { PassThrough } = require('stream'); describe('app.respond', () => { describe('when ctx.respond === false', () => { @@ -618,6 +619,46 @@ describe('app.respond', () => { assert.deepEqual(res.body, pkg); }); + it('should serve object mode streams as JSON', async() => { + const app = new Koa(); + + app.use(ctx => { + const stream = new PassThrough({ objectMode: true, autoDestroy: false }); + ctx.body = stream; + setImmediate(() => { + stream.write({ foo: 1 }); + stream.write({ boo: [true] }); + stream.end({ finish: true }); + }); + }); + + const server = app.listen(); + + const res = await request(server) + .get('/') + .expect('Content-Type', 'application/json; charset=utf-8') + .expect('[{"foo":1},{"boo":[true]},{"finish":true}]'); + assert.strictEqual(res.headers.hasOwnProperty('content-length'), false); + }); + + it('empty object mode stream should result in empty array', async() => { + const app = new Koa(); + + app.use(ctx => { + const stream = new PassThrough({ objectMode: true, autoDestroy: false }); + ctx.body = stream; + setImmediate(() => stream.end()); + }); + + const server = app.listen(); + + const res = await request(server) + .get('/') + .expect('Content-Type', 'application/json; charset=utf-8') + .expect('[]'); + assert.strictEqual(res.headers.hasOwnProperty('content-length'), false); + }); + it('should handle errors', done => { const app = new Koa(); diff --git a/test/response/body.js b/test/response/body.js index 1dfcac55e..aa617c8da 100644 --- a/test/response/body.js +++ b/test/response/body.js @@ -4,6 +4,7 @@ const response = require('../helpers/context').response; const assert = require('assert'); const fs = require('fs'); +const { PassThrough } = require('stream'); describe('res.body=', () => { describe('when Content-Type is set', () => { @@ -108,6 +109,13 @@ describe('res.body=', () => { res.body = fs.createReadStream('LICENSE'); assert.equal('application/octet-stream', res.header['content-type']); }); + + it('object mode stream', () => { + const res = response(); + const stream = new PassThrough({ objectMode: true }); + res.body = stream; + assert.equal('application/json; charset=utf-8', res.header['content-type']); + }); }); describe('when a buffer is given', () => {