Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions packages/pubsub/src/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,17 @@ Subscription.prototype.ack_ = function(message) {
Subscription.prototype.breakLease_ = function(message) {
var messageIndex = this.inventory_.lease.indexOf(message.ackId);

if (messageIndex === -1) {
return;
}

this.inventory_.lease.splice(messageIndex, 1);
this.inventory_.bytes -= message.length;

if (this.connectionPool) {
if (this.connectionPool.isPaused && !this.hasMaxMessages_()) {
this.connectionPool.resume();
}
var pool = this.connectionPool;

if (pool && pool.isPaused && !this.hasMaxMessages_()) {
pool.resume();
}

if (!this.inventory_.lease.length) {
Expand Down Expand Up @@ -701,7 +705,7 @@ Subscription.prototype.getMetadata = function(gaxOpts, callback) {
* @return {boolean}
*/
Subscription.prototype.hasMaxMessages_ = function() {
return this.inventory_.lease.length >= this.flowControl.maxMessages ||
return this.inventory_.lease.length > this.flowControl.maxMessages ||
this.inventory_.bytes >= this.flowControl.maxBytes;
};

Expand Down Expand Up @@ -859,11 +863,16 @@ Subscription.prototype.openConnection_ = function() {
});

pool.on('message', function(message) {
self.emit('message', self.leaseMessage_(message));
if (!self.hasMaxMessages_()) {
self.emit('message', self.leaseMessage_(message));
return;
}

if (self.hasMaxMessages_() && !pool.isPaused) {
if (!pool.isPaused) {
pool.pause();
}

message.nack();
});

pool.once('connected', function() {
Expand Down
24 changes: 24 additions & 0 deletions packages/pubsub/system-test/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,30 @@ describe('pubsub', function() {
setTimeout(() => subscription.close(done), 2500);
}
});

it('should respect flow control limits', function(done) {
var maxMessages = 3;
var messageCount = 0;

var subscription = topic.subscription(SUB_NAMES[0], {
flowControl: {
maxMessages: maxMessages
}
});

subscription.on('error', done);
subscription.on('message', onMessage);

function onMessage() {
if (++messageCount < (maxMessages + 1)) {
return;
}

setImmediate(function() {
subscription.close(done);
});
}
});
});

describe('IAM', function() {
Expand Down
11 changes: 10 additions & 1 deletion packages/pubsub/test/connection-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -1170,13 +1170,22 @@ describe('ConnectionPool', function() {
});

it('should reset internal used props', function() {
var fakeDate = Date.now();
var dateNow = Date.now;

global.Date.now = function() {
return fakeDate;
};

pool.failedConnectionAttempts = 100;
pool.noConnectionsTime = 0;

pool.open();

assert.strictEqual(pool.failedConnectionAttempts, 0);
assert.strictEqual(pool.noConnectionsTime, Date.now());
assert.strictEqual(pool.noConnectionsTime, fakeDate);

global.Date.now = dateNow;
});

it('should listen for newListener events', function() {
Expand Down
38 changes: 34 additions & 4 deletions packages/pubsub/test/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,19 @@ describe('Subscription', function() {
assert.strictEqual(subscription.inventory_.bytes, 0);
});

it('should noop for unknown messages', function() {
var message = {
ackId: 'def',
data: new Buffer('world'),
length: 5
};

subscription.breakLease_(message);

assert.strictEqual(subscription.inventory_.lease.length, 1);
assert.strictEqual(subscription.inventory_.bytes, 5);
});

describe('with connection pool', function() {
it('should resume receiving messages if paused', function(done) {
subscription.connectionPool = {
Expand Down Expand Up @@ -1133,9 +1146,9 @@ describe('Subscription', function() {
});

describe('hasMaxMessages_', function() {
it('should return true if the number of leases == maxMessages', function() {
it('should return true if the number of leases > maxMessages', function() {
subscription.inventory_.lease = ['a', 'b', 'c'];
subscription.flowControl.maxMessages = 3;
subscription.flowControl.maxMessages = 2;

assert(subscription.hasMaxMessages_());
});
Expand Down Expand Up @@ -1432,7 +1445,7 @@ describe('Subscription', function() {
});

it('should pause the pool if sub is at max messages', function(done) {
var message = {};
var message = { nack: fakeUtil.noop };
var leasedMessage = {};

subscription.leaseMessage_ = function() {
Expand All @@ -1450,7 +1463,7 @@ describe('Subscription', function() {
});

it('should not re-pause the pool', function(done) {
var message = {};
var message = { nack: fakeUtil.noop };
var leasedMessage = {};

subscription.leaseMessage_ = function() {
Expand All @@ -1472,6 +1485,23 @@ describe('Subscription', function() {
done();
});

it('should nack messages if over limit', function(done) {
var message = { nack: done };
var leasedMessage = {};

subscription.leaseMessage_ = function() {
return leasedMessage;
};

subscription.hasMaxMessages_ = function() {
return true;
};

subscription.openConnection_();
subscription.connectionPool.isPaused = true;
subscription.connectionPool.emit('message', message);
});

it('should flush the queue when connected', function(done) {
subscription.flushQueues_ = done;

Expand Down