Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/common-grpc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
"dot-prop": "^2.4.0",
"duplexify": "^3.5.0",
"extend": "^3.0.0",
"grpc": "^1.3.1",
"grpc": "^1.6.0",
"is": "^3.2.0",
"modelo": "^4.2.0",
"retry-request": "^2.0.0",
Expand Down
1 change: 1 addition & 0 deletions packages/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"google-auto-auth": "^0.7.1",
"google-gax": "^0.13.0",
"google-proto-files": "^0.12.0",
"grpc": "^1.6.0",

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

"is": "^3.0.1",
"uuid": "^3.0.1"
},
Expand Down
171 changes: 132 additions & 39 deletions packages/pubsub/src/connection-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ var uuid = require('uuid');
var PKG = require('../package.json');
var v1 = require('./v1');

var CHANNEL_READY_EVENT = 'channel.ready';
var CHANNEL_ERROR_EVENT = 'channel.error';

// if we can't establish a connection within 5 minutes, we need to back off
// and emit an error to the user.
var MAX_TIMEOUT = 300000;

// codes to retry streams
var RETRY_CODES = [
0, // ok
Expand Down Expand Up @@ -62,6 +69,7 @@ function ConnectionPool(subscription) {

this.isPaused = false;
this.isOpen = false;
this.isGettingChannelState = false;

this.failedConnectionAttempts = 0;
this.noConnectionsTime = 0;
Expand All @@ -72,7 +80,6 @@ function ConnectionPool(subscription) {
};

this.queue = [];
events.EventEmitter.call(this);

// grpc related fields we need since we're bypassing gax
this.metadata_ = new grpc.Metadata();
Expand All @@ -83,6 +90,7 @@ function ConnectionPool(subscription) {
'grpc/' + require('grpc/package.json').version
].join(' '));

events.EventEmitter.call(this);
this.open();
}

Expand Down Expand Up @@ -142,16 +150,25 @@ ConnectionPool.prototype.acquire = function(id, callback) {
* @param {?error} callback.error - An error returned while closing the pool.
*/
ConnectionPool.prototype.close = function(callback) {
var self = this;
var connections = Array.from(this.connections.values());

callback = callback || common.util.noop;

this.isOpen = false;
self.connections.clear();
this.queue.forEach(clearTimeout);
if (this.client) {
this.client.close();
}

this.connections.clear();
this.queue.forEach(clearTimeout);
this.queue.length = 0;

this.isOpen = false;
this.isGettingChannelState = false;

this.removeAllListeners('newListener')
.removeAllListeners(CHANNEL_READY_EVENT)
.removeAllListeners(CHANNEL_ERROR_EVENT);

this.failedConnectionAttempts = 0;
this.noConnectionsTime = 0;

Expand All @@ -177,28 +194,60 @@ ConnectionPool.prototype.createConnection = function() {
var connection = client.streamingPull(self.metadata_);
var errorImmediateHandle;

connection.on('error', function(err) {
// since this is a bidi stream it's possible that we recieve errors from
// reads or writes. We also want to try and cut down on the number of
// errors that we emit if other connections are still open. So by using
// setImmediate we're able to cancel the error message if it gets passed
// to the `status` event where we can check if the connection should be
// re-opened or if we should send the error to the user
errorImmediateHandle = setImmediate(self.emit.bind(self), 'error', err);
});
if (self.isPaused) {
connection.pause();
}

connection.on('metadata', function(metadata) {
if (!metadata.get('date').length) {
return;
}
self
.once(CHANNEL_ERROR_EVENT, onChannelError)
.once(CHANNEL_READY_EVENT, onChannelReady);

connection
.on('error', onConnectionError)
.on('data', onConnectionData)
.on('status', onConnectionStatus)
.write({
subscription: common.util.replaceProjectIdToken(
self.subscription.name, self.projectId),
streamAckDeadlineSeconds: self.settings.ackDeadline / 1000
});

self.connections.set(id, connection);

function onChannelError() {
self.removeListener(CHANNEL_READY_EVENT, onChannelReady);

connection.cancel();
}

function onChannelReady() {
self.removeListener(CHANNEL_ERROR_EVENT, onChannelError);

connection.isConnected = true;

self.noConnectionsTime = 0;
self.failedConnectionAttempts = 0;

self.emit('connected', connection);
});
}

// since this is a bidi stream it's possible that we recieve errors from
// reads or writes. We also want to try and cut down on the number of
// errors that we emit if other connections are still open. So by using
// setImmediate we're able to cancel the error message if it gets passed
// to the `status` event where we can check if the connection should be
// re-opened or if we should send the error to the user
function onConnectionError(err) {
errorImmediateHandle = setImmediate(self.emit.bind(self), 'error', err);
}

connection.on('status', function(status) {
function onConnectionData(data) {
arrify(data.receivedMessages).forEach(function(message) {
self.emit('message', self.createMessage(id, message));
});
}

function onConnectionStatus(status) {
clearImmediate(errorImmediateHandle);

connection.end();
Expand All @@ -219,25 +268,7 @@ ConnectionPool.prototype.createConnection = function() {
error.code = status.code;
self.emit('error', error);
}
});

connection.on('data', function(data) {
arrify(data.receivedMessages).forEach(function(message) {
self.emit('message', self.createMessage(id, message));
});
});

if (self.isPaused) {
connection.pause();
}

connection.write({
subscription: common.util.replaceProjectIdToken(
self.subscription.name, self.projectId),
streamAckDeadlineSeconds: self.settings.ackDeadline / 1000
});

self.connections.set(id, connection);
});
};

Expand Down Expand Up @@ -277,6 +308,59 @@ ConnectionPool.prototype.createMessage = function(connectionId, resp) {
};
};

/**
* Gets the channels connectivity state and emits channel events accordingly.
*
* @fires CHANNEL_ERROR_EVENT
* @fires CHANNEL_READY_EVENT
*/
ConnectionPool.prototype.getAndEmitChannelState = function() {
var self = this;

this.isGettingChannelState = true;

this.getClient(function(err, client) {
if (err) {
self.isGettingChannelState = false;
self.emit(CHANNEL_ERROR_EVENT);
self.emit('error', err);
return;
}

var READY_STATE = 2;

var channel = client.getChannel();
var connectivityState = channel.getConnectivityState(false);

if (connectivityState === READY_STATE) {
self.isGettingChannelState = false;
self.emit(CHANNEL_READY_EVENT);
return;
}

var elapsedTimeWithoutConnection = 0;
var now = Date.now();
var deadline;

if (self.noConnectionsTime) {
elapsedTimeWithoutConnection = now - self.noConnectionsTime;
}

deadline = now + (MAX_TIMEOUT - elapsedTimeWithoutConnection);

client.waitForReady(deadline, function(err) {
self.isGettingChannelState = false;

if (err) {
self.emit(CHANNEL_ERROR_EVENT, err);
return;
}

self.emit(CHANNEL_READY_EVENT);
});
});
};

/**
* Gets the Subscriber client. We need to bypass GAX until they allow deadlines
* to be optional.
Expand Down Expand Up @@ -312,6 +396,7 @@ ConnectionPool.prototype.getClient = function(callback) {
}

self.client = new Subscriber(address, credentials, {
'grpc.keepalive_time_ms': MAX_TIMEOUT,
'grpc.max_receive_message_length': 20000001,
'grpc.primary_user_agent': common.util.getUserAgentFromPackageJson(PKG)
});
Expand Down Expand Up @@ -375,6 +460,8 @@ ConnectionPool.prototype.isConnected = function() {
* Creates specified number of connections and puts pool in open state.
*/
ConnectionPool.prototype.open = function() {
var self = this;

var existing = this.connections.size;
var max = this.settings.maxConnections;

Expand All @@ -385,6 +472,12 @@ ConnectionPool.prototype.open = function() {
this.isOpen = true;
this.failedConnectionAttempts = 0;
this.noConnectionsTime = Date.now();

this.on('newListener', function(eventName) {
if (eventName === CHANNEL_READY_EVENT && !self.isGettingChannelState) {
self.getAndEmitChannelState();
}
});
};

/**
Expand Down Expand Up @@ -450,7 +543,7 @@ ConnectionPool.prototype.shouldReconnect = function(status) {
}

var exceededRetryLimit = this.noConnectionsTime &&
Date.now() - this.noConnectionsTime > 300000;
Date.now() - this.noConnectionsTime > MAX_TIMEOUT;

if (exceededRetryLimit) {
return false;
Expand Down
22 changes: 15 additions & 7 deletions packages/pubsub/src/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ function Subscription(pubsub, name, options) {
this.flushTimeoutHandle_ = null;
this.leaseTimeoutHandle_ = null;
this.userClosed_ = false;
this.isOpen = false;

events.EventEmitter.call(this);
this.messageListeners = 0;
Expand Down Expand Up @@ -328,12 +329,12 @@ Subscription.prototype.breakLease_ = function(message) {
Subscription.prototype.close = function(callback) {
this.userClosed_ = true;

var inventory = this.inventory_;
inventory.lease.length = inventory.bytes = 0;

clearTimeout(this.leaseTimeoutHandle_);
this.leaseTimeoutHandle_ = null;

clearTimeout(this.flushTimeoutHandle_);
this.flushTimeoutHandle_ = null;

this.flushQueues_();
this.closeConnection_(callback);
};
Expand All @@ -347,6 +348,8 @@ Subscription.prototype.close = function(callback) {
* @param {?error} err - An error returned from this request.
*/
Subscription.prototype.closeConnection_ = function(callback) {
this.isOpen = false;

if (this.connectionPool) {
this.connectionPool.close(callback || common.util.noop);
this.connectionPool = null;
Expand Down Expand Up @@ -520,6 +523,9 @@ Subscription.prototype.exists = function(callback) {
Subscription.prototype.flushQueues_ = function() {
var self = this;

clearTimeout(this.flushTimeoutHandle_);

This comment was marked as spam.

This comment was marked as spam.

this.flushTimeoutHandle_ = null;

var acks = this.inventory_.ack;
var nacks = this.inventory_.nack;

Expand Down Expand Up @@ -846,6 +852,8 @@ Subscription.prototype.openConnection_ = function() {
var self = this;
var pool = this.connectionPool = new ConnectionPool(this);

this.isOpen = true;

pool.on('error', function(err) {
self.emit('error', err);
});
Expand All @@ -859,8 +867,6 @@ Subscription.prototype.openConnection_ = function() {
});

pool.once('connected', function() {
clearTimeout(self.flushTimeoutHandle_);
self.flushTimeoutHandle_ = null;
self.flushQueues_();
});
};
Expand All @@ -874,14 +880,16 @@ Subscription.prototype.openConnection_ = function() {
Subscription.prototype.renewLeases_ = function() {
var self = this;

clearTimeout(this.leaseTimeoutHandle_);
this.leaseTimeoutHandle_ = null;

if (!this.inventory_.lease.length) {
return;
}

var ackIds = this.inventory_.lease;
this.ackDeadline = this.histogram.percentile(99);

var ackIds = this.inventory_.lease.slice();
var ackDeadlineSeconds = this.ackDeadline / 1000;

if (this.connectionPool) {
Expand Down Expand Up @@ -991,7 +999,7 @@ Subscription.prototype.setFlushTimeout_ = function() {
* @private
*/
Subscription.prototype.setLeaseTimeout_ = function() {
if (this.leaseTimeoutHandle_) {
if (this.leaseTimeoutHandle_ || !this.isOpen) {
return;
}

Expand Down
Loading