-
Notifications
You must be signed in to change notification settings - Fork 653
Description
Environment details
- OS: Debian Jessie
- Node.js version: 8.5.0
- google-cloud-node version: 0.14.2
Steps to reproduce
I think there is an issue with queue flushing and reconnecting in the PubSub client. Here's a minimal example to reproduce it (locally, with the latest version of PubSub emulator):
const pubsub = require('@google-cloud/pubsub')();
let last = Date.now();
function onError(err) {
console.log('ON ERROR', err);
}
function onMessage(message) {
console.log('ON MESSAGE', message.attributes, (Date.now() - last) / 1000);
last = Date.now();
message.ack();
}
const topicName = `my-topic-${Math.floor(Math.random() * 10000)}`;
const subscriptionName = `my-subscription-${Math.floor(Math.random() * 10000)}`;
pubsub.createTopic(topicName, () => {
const topic = pubsub.topic(topicName);
const publisher = topic.publisher();
topic.createSubscription(subscriptionName, () => {
const subscription = pubsub.subscription(subscriptionName);
subscription.on('error', onError);
subscription.on('message', onMessage);
publisher.publish(new Buffer('x'.repeat(4000)), { batch: 'first' }, (err, messageId) => {
console.log('DONE publishing immediately', err, messageId);
});
setTimeout(() => {
publisher.publish(new Buffer('x'.repeat(4000)), { batch: 'second' }, (err, messageId) => {
console.log('DONE publishing after timeout', err, messageId);
});
}, 2000);
});
});
process.on('SIGINT', () => {
const subscription = pubsub.subscription(subscriptionName);
subscription.removeListener('message', onMessage);
subscription.removeListener('error', onError);
process.exit();
});Output:
DONE publishing immediately null 229
ON MESSAGE { batch: 'first' } 1.36
DONE publishing after timeout null 230
ON MESSAGE { batch: 'second' } 1.988
ON MESSAGE { batch: 'second' } 10.008
ON MESSAGE { batch: 'second' } 10.006
As you can see the second message is re-delivered over and over again.
What I could find out so far is the following:
In
https://github.com/GoogleCloudPlatform/google-cloud-node/blob/badd1b8e32cf0bbcab6265fa4975c8c25f7116d0/packages/pubsub/src/subscription.js#L261
this.connectionPool.isConnected() always returns false for me.
Digging deeper I found out that in
https://github.com/GoogleCloudPlatform/google-cloud-node/blob/badd1b8e32cf0bbcab6265fa4975c8c25f7116d0/packages/pubsub/src/connection-pool.js#L364
connection.isConnected is always undefined for me, hence ConnectionPool.isConnected() always returns false.
So since connectionPool.isConnected() always returns false, the first ack() call will call setFlushTimeout_() in:
https://github.com/GoogleCloudPlatform/google-cloud-node/blob/badd1b8e32cf0bbcab6265fa4975c8c25f7116d0/packages/pubsub/src/subscription.js#L263
This will in fact run this.flushQueues_() exactly once (so the first message is actually acknowledged), but since this.flushQueues_() does not reset this.flushTimeoutHandle_ all subsequent calls to this.setFlushTimeout_() will bail out here:
https://github.com/GoogleCloudPlatform/google-cloud-node/blob/badd1b8e32cf0bbcab6265fa4975c8c25f7116d0/packages/pubsub/src/subscription.js#L980-L982
So basically I end up with two questions:
- Shouldn't
this.flushQueues_()make sure to always unsetthis.flushTimeoutHandle_after the queues have been flushed? Otherwisethis.setFlushTimeout_()can effectively be called only exactly once during the runtime of the process to triggerthis.flushQueues_(), all subsequent calls will have no effect. I assume this is a bug. - Why is
connection.isConnectedalwaysundefined? Is that another bug in theconnectionPool.isConnected()method?