-
Notifications
You must be signed in to change notification settings - Fork 233
Description
Environment details
- OS: Kubernetes Engine
- Node.js version: 9.3.0
- npm version: –
- @google-cloud/pubsub version: 0.16.2
Steps to reproduce
I have a consumer running on Kubernetes Engine which reads messages from a PubSub subscription and writes them to BigQuery.
I have implemented a clean shutdown logic when my pods get terminated (e.g. during a rolling updated). In that shutdown logic I call subscription.close().
Occasionally calling subscription.close() leads to the following error being emitted in the subscription.on('error') event:
Error: Request payload size exceeds the limit: 524288 bytes.
at /app/node_modules/grpc/src/client.js:554:15
If I inspect the error that I receive in the subscription.on('error') handler, I can see that the stack trace contains either:
at Subscription.emit (events.js:159:13)
at /app/node_modules/@google-cloud/pubsub/src/subscription.js:346:10
at <anonymous>"
or:
at Subscription.emit (events.js:159:13)
at /app/node_modules/@google-cloud/pubsub/src/subscription.js:869:10
at <anonymous>
I could trace down the problem as follows:
When subscription.close() is called, this in turn calls flushQueues_():
nodejs-pubsub/src/subscription.js
Lines 381 to 395 in d89287c
| Subscription.prototype.close = function(callback) { | |
| var self = this; | |
| this.userClosed_ = true; | |
| var inventory = this.inventory_; | |
| inventory.lease.length = inventory.bytes = 0; | |
| clearTimeout(this.leaseTimeoutHandle_); | |
| this.leaseTimeoutHandle_ = null; | |
| this.flushQueues_().then(function() { | |
| self.closeConnection_(callback); | |
| }); | |
| }; |
Within flushQueues_(), in case there are pending acks or nacks, acknowledge_() or modifyAckDeadline_() are being called will all pending ackIds/nackIds respectively:
nodejs-pubsub/src/subscription.js
Lines 631 to 647 in d89287c
| if (acks.length) { | |
| requests.push( | |
| this.acknowledge_(acks).then(function() { | |
| self.inventory_.ack = []; | |
| }) | |
| ); | |
| } | |
| if (nacks.length) { | |
| requests.push( | |
| this.modifyAckDeadline_(nacks, 0).then(function() { | |
| self.inventory_.nack = []; | |
| }) | |
| ); | |
| } | |
| return Promise.all(requests); |
Both these methods are rather similar. In any case they will simply make a call to the API with all the ackIds/nackIds that were passed from flushQueues_().
Here's acknowledge_():
nodejs-pubsub/src/subscription.js
Line 318 in d89287c
| connection.write({ackIds}, resolve); |
And here's modifyAckDeadline_():
nodejs-pubsub/src/subscription.js
Lines 899 to 905 in d89287c
| connection.write( | |
| { | |
| modifyDeadlineAckIds: ackIds, | |
| modifyDeadlineSeconds: Array(ackIds.length).fill(deadline), | |
| }, | |
| resolve | |
| ); |
Apparently these API-calls are too big, since the corresponding promises reject with an error that gets then emitted and that I then see in my application code:
nodejs-pubsub/src/subscription.js
Lines 323 to 325 in d89287c
| return promise.catch(function(err) { | |
| self.emit('error', err); | |
| }); |
nodejs-pubsub/src/subscription.js
Lines 910 to 912 in d89287c
| return promise.catch(function(err) { | |
| self.emit('error', err); | |
| }); |
The error message mentions 524288 bytes which is precisely 512 KiB.
My suspicion is that the acks/nacks must be sent in batches in case they exceed the maximum payload size for acknowledge/modifyAckDeadline requests?
Unfortunately the REST API docs don't say anything about limits.
In any case I would expect that subscription.close() allows for a clean shutdown, even if there are many pending acks/nacks. The current behavior leads to potential duplicates on my end since pending acks are not correctly being flushed: since the sending of the pending acks itself triggers an error and subsequent crash in my application code the will be re-consumed after the restart of my application, even though they have already been processed.