Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 18 additions & 23 deletions lib/common/stream-router.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
'use strict';

var concat = require('concat-stream');
var isStreamEnded = require('is-stream-ended');
var split = require('split-array-stream');
var streamEvents = require('stream-events');
var through = require('through2');

Expand Down Expand Up @@ -182,9 +182,9 @@ streamRouter.runAsStream_ = function(parsedArguments, originalMethod) {

var stream = streamEvents(through.obj());

function shouldPushResult() {
return resultsToSend !== 0 && !isStreamEnded(stream);
}
stream.once('reading', function() {
originalMethod(query, onResultSet);
});

function onResultSet(err, results, nextQuery) {
if (err) {
Expand All @@ -193,32 +193,27 @@ streamRouter.runAsStream_ = function(parsedArguments, originalMethod) {
return;
}

var result;
while ((result = results.shift()) && shouldPushResult()) {
stream.push(result);
resultsToSend--;
if (resultsToSend >= 0 && results.length > resultsToSend) {
results = results.splice(0, resultsToSend);
}

if (isStreamEnded(stream)) {
return;
}
resultsToSend -= results.length;

if (resultsToSend === 0) {
stream.end();
return;
}
split(results, stream, function(streamEnded) {
if (streamEnded) {
return;
}

if (nextQuery) {
originalMethod(nextQuery, onResultSet);
} else {
if (nextQuery && resultsToSend !== 0) {
originalMethod(nextQuery, onResultSet);
return;
}

stream.push(null);
stream.end();
}
});
}

stream.once('reading', function() {
originalMethod(query, onResultSet);
});

return stream;
};

Expand Down
78 changes: 30 additions & 48 deletions lib/datastore/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@

'use strict';

var isStreamEnded = require('is-stream-ended');
var concat = require('concat-stream');
var request = require('request').defaults({
pool: {
maxSockets: Infinity
}
});
var split = require('split-array-stream');
var through = require('through2');

/**
Expand Down Expand Up @@ -101,7 +102,6 @@ function DatastoreRequest() {}
* @param {?error} callback.err - An error returned while making this request
* @param {module:datastore/entity|module:datastore/entity[]} callback.entity -
* Will return either a single Entity or a list of Entities.
* @param {object} callback.apiResponse - The full API response.
*
* @example
* //-
Expand All @@ -114,7 +114,7 @@ function DatastoreRequest() {}
* //-
* var key = dataset.key(['Company', 123]);
*
* transaction.get(key, function(err, entity, apiResponse) {});
* transaction.get(key, function(err, entity) {});
*
* //-
* // Get multiple entities at once with a callback.
Expand All @@ -124,13 +124,13 @@ function DatastoreRequest() {}
* dataset.key(['Product', 'Computer'])
* ];
*
* transaction.get(keys, function(err, entities, apiResponse) {});
* transaction.get(keys, function(err, entities) {});
*
* //-
* // Or, get the entities as a readable object stream.
* //-
* transaction.get(keys)
* .on('error', function(err, apiResponse) {})
* .on('error', function(err) {})
* .on('data', function(entity) {
* // entity is an entity object.
* })
Expand All @@ -139,72 +139,54 @@ function DatastoreRequest() {}
* });
*/
DatastoreRequest.prototype.get = function(keys, callback) {
var self = this;

var isStreamMode = !callback;
var stream;

if (isStreamMode) {
stream = through.obj();
if (util.is(callback, 'function')) {
// Run this method in stream mode and send the results back to the callback.
this.get(keys)
.on('error', callback)
.pipe(concat(function(results) {
var isSingleLookup = !util.is(keys, 'array');
callback(null, isSingleLookup ? results[0] : results);
}));
return;
}

var isSingleLookup = !util.is(keys, 'array');
keys = util.arrayize(keys).map(entity.keyToKeyProto);

if (keys.length === 0) {
throw new Error('At least one Key object is required.');
}

var request = {
key: keys
};

var entities = [];
this.makeReq_('lookup', request, onApiResponse);
var self = this;
var stream = through.obj();

function onApiResponse(err, resp) {
if (err) {
if (isStreamMode) {
stream.emit('error', err, resp);
stream.end();
} else {
callback(err, null, resp);
}
stream.emit('error', err);
stream.end();
return;
}

var results = entity.formatArray(resp.found);
var entities = entity.formatArray(resp.found);
var nextKeys = (resp.deferred || []).map(entity.keyFromKeyProto);

if (isStreamMode) {
var result;
while ((result = results.shift()) && !isStreamEnded(stream)) {
stream.push(result);
split(entities, stream, function(streamEnded) {
if (streamEnded) {
return;
}
} else {
entities = entities.concat(results);
}

if (isStreamMode && isStreamEnded(stream)) {
return;
}

if (nextKeys.length > 0) {
self.get(nextKeys, onApiResponse);
return;
}
if (nextKeys.length > 0) {
self.get(nextKeys, onApiResponse);
return;
}

if (isStreamMode) {
stream.push(null);
stream.end();
} else {
callback(null, isSingleLookup ? entities[0] : entities, resp);
}
});
}

if (isStreamMode) {
return stream;
}
this.makeReq_('lookup', { key: keys }, onApiResponse);

return stream;
};

/**
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@
"duplexify": "^3.2.0",
"extend": "^2.0.0",
"google-auth-library": "^0.9.4",
"is-stream-ended": "^0.1.0",
"mime-types": "^2.0.8",
"node-uuid": "^1.4.2",
"once": "^1.3.1",
"protobufjs": "^3.8.2",
"request": "^2.53.0",
"retry-request": "^1.1.0",
"split-array-stream": "^1.0.0",
"sse4_crc32": "^3.1.0",
"stream-events": "^1.0.1",
"stream-forward": "^2.0.0",
Expand Down
49 changes: 19 additions & 30 deletions test/datastore/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ var assert = require('assert');
var ByteBuffer = require('bytebuffer');
var entity = require('../../lib/datastore/entity.js');
var extend = require('extend');
var isStreamEnded = require('is-stream-ended');
var mockery = require('mockery');
var mockRespGet = require('../testdata/response_get.json');
var pb = require('../../lib/datastore/pb.js');
Expand Down Expand Up @@ -181,23 +180,19 @@ describe('Request', function() {
});

describe('callback mode', function() {
it('should execute callback with error & API response', function(done) {
request.get(key, function(err, entity, apiResponse_) {
it('should execute callback with error', function(done) {
request.get(key, function(err) {
assert.strictEqual(err, error);
assert.strictEqual(entity, null);
assert.strictEqual(apiResponse_, apiResponse);
done();
});
});
});

describe('stream mode', function() {
it('should emit error & API response', function(done) {
it('should emit error', function(done) {
request.get(key)
.on('error', function(err, apiResponse_) {
.on('error', function(err) {
assert.strictEqual(err, error);
assert.strictEqual(apiResponse_, apiResponse);

done();
});
});
Expand All @@ -207,7 +202,7 @@ describe('Request', function() {

stream.on('error', function() {
setImmediate(function() {
assert.strictEqual(isStreamEnded(stream), true);
assert.strictEqual(stream._readableState.ended, true);
done();
});
});
Expand Down Expand Up @@ -238,39 +233,34 @@ describe('Request', function() {
it('should format the results', function(done) {
entityOverrides.formatArray = function(arr) {
assert.strictEqual(arr, apiResponse.found);
done();
setImmediate(done);
return arr;
};

request.get(key, assert.ifError);
});

it('should continue looking for deferred results', function(done) {
var lookupCount = 0;

request.makeReq_ = function(method, req, callback) {
lookupCount++;

if (lookupCount === 1) {
callback(null, apiResponseWithDeferred);
return;
}

if (lookupCount > 1) {
done();
}
callback(null, apiResponseWithDeferred);
};

request.get(key, assert.ifError);

request.get = function(keys) {
var expectedKeys = apiResponseWithDeferred.deferred
.map(entity.keyFromKeyProto);

assert.deepEqual(keys, expectedKeys);
done();
};
});

describe('callback mode', function() {
it('should exec callback with results & API response', function(done) {
request.get(key, function(err, entity, apiResponse_) {
it('should exec callback with results', function(done) {
request.get(key, function(err, entity) {
assert.ifError(err);

assert.deepEqual(entity, expectedResult);
assert.strictEqual(apiResponse_, apiResponse);

done();
});
});
Expand All @@ -280,13 +270,12 @@ describe('Request', function() {
callback(null, apiResponseWithMultiEntities);
};

request.get([key, key], function(err, entities, apiResponse) {
request.get([key, key], function(err, entities) {
assert.ifError(err);

assert.strictEqual(util.is(entities, 'array'), true);
assert.deepEqual(entities, expectedResults);

assert.strictEqual(apiResponse, apiResponseWithMultiEntities);
done();
});
});
Expand Down