From 02c617e5f36c2715a1516bcbfbe91101dca081b4 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Thu, 30 Mar 2017 18:26:36 +1100 Subject: [PATCH 1/3] pubsub: acquire FlowController before releasing --- .../cloud/pubsub/spi/v1/MessageDispatcher.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index b6b7318e30c7..65727f9cd75f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -260,8 +260,7 @@ public int getMessageDeadlineSeconds() { } public void processReceivedMessages(List responseMessages) { - int receivedMessagesCount = responseMessages.size(); - if (receivedMessagesCount == 0) { + if (responseMessages.size() == 0) { return; } Instant now = new Instant(clock.millisTime()); @@ -276,7 +275,13 @@ public void processReceivedMessages(List r logger.log( Level.FINER, "Received {0} messages at {1}", new Object[] {responseMessages.size(), now}); + try { + flowController.reserve(responseMessages.size(), totalByteCount); + } catch (FlowController.FlowControlException unexpectedException) { + throw new IllegalStateException("Flow control unexpected exception", unexpectedException); + } messagesWaiter.incrementPendingMessages(responseMessages.size()); + Iterator acksIterator = ackHandlers.iterator(); for (ReceivedMessage userMessage : responseMessages) { final PubsubMessage message = userMessage.getMessage(); @@ -308,12 +313,6 @@ public void run() { new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers)); } setupNextAckDeadlineExtensionAlarm(expiration); - - try { - flowController.reserve(receivedMessagesCount, totalByteCount); - } catch (FlowController.FlowControlException unexpectedException) { - throw new IllegalStateException("Flow control unexpected exception", unexpectedException); - } } private void setupPendingAcksAlarm() { From 28d348d2180102affadec4689dd01b04bf5e6ce4 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Tue, 4 Apr 2017 18:07:20 +1000 Subject: [PATCH 2/3] pr comment --- .../pubsub/spi/v1/MessageDispatcher.java | 49 +++++++++++++------ 1 file changed, 35 insertions(+), 14 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index 65727f9cd75f..1adb163ca630 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -16,8 +16,8 @@ package com.google.cloud.pubsub.spi.v1; -import com.google.api.gax.core.FlowController; import com.google.api.gax.core.ApiClock; +import com.google.api.gax.core.FlowController; import com.google.api.stats.Distribution; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; @@ -28,6 +28,7 @@ import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.ReceivedMessage; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; @@ -260,28 +261,46 @@ public int getMessageDeadlineSeconds() { } public void processReceivedMessages(List responseMessages) { - if (responseMessages.size() == 0) { + if (responseMessages.isEmpty()) { return; } - Instant now = new Instant(clock.millisTime()); - int totalByteCount = 0; + final ArrayList ackHandlers = new ArrayList<>(responseMessages.size()); for (ReceivedMessage pubsubMessage : responseMessages) { - int messageSize = pubsubMessage.getMessage().getSerializedSize(); - totalByteCount += messageSize; - ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize)); + ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), pubsubMessage.getMessage().getSerializedSize())); } + + Instant now = new Instant(clock.millisTime()); Instant expiration = now.plus(messageDeadlineSeconds * 1000); logger.log( Level.FINER, "Received {0} messages at {1}", new Object[] {responseMessages.size(), now}); + synchronized (outstandingAckHandlers) { + // AckDeadlineAlarm modifies lists in outstandingAckHandlers in-place and might run at any time. + // We will also later iterate over ackHandlers when we give messages to user code. + // We must create a new list to pass to outstandingAckHandlers, + // so that we can't iterate and modify the list concurrently. + outstandingAckHandlers.add( + new ExtensionJob( + expiration, + INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, + new ArrayList(ackHandlers))); + } + setupNextAckDeadlineExtensionAlarm(expiration); + + // Deadline extension must be set up before we reserve flow control. + // Flow control might block for a while, and extension will keep messages from expiring. + try { - flowController.reserve(responseMessages.size(), totalByteCount); - } catch (FlowController.FlowControlException unexpectedException) { - throw new IllegalStateException("Flow control unexpected exception", unexpectedException); + flowController.reserve(responseMessages.size(), totalMessageSize(responseMessages)); + } catch (FlowController.FlowControlException e) { + throw new IllegalStateException("Flow control unexpected exception", e); } messagesWaiter.incrementPendingMessages(responseMessages.size()); + // Reserving flow control must happen before we give the messages to the user, + // otherwise the user code might be given too many messages to process at once. + Iterator acksIterator = ackHandlers.iterator(); for (ReceivedMessage userMessage : responseMessages) { final PubsubMessage message = userMessage.getMessage(); @@ -307,12 +326,14 @@ public void run() { } }); } + } - synchronized (outstandingAckHandlers) { - outstandingAckHandlers.add( - new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers)); + private static int totalMessageSize(Collection messages) { + int total = 0; + for (ReceivedMessage message : messages) { + total += message.getMessage().getSerializedSize(); } - setupNextAckDeadlineExtensionAlarm(expiration); + return total; } private void setupPendingAcksAlarm() { From d08b5f6cd06d8272a8f7b679f8b2bbf6b5a626b2 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Wed, 5 Apr 2017 12:59:32 +1000 Subject: [PATCH 3/3] pr comment --- .../pubsub/spi/v1/MessageDispatcher.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java index 1adb163ca630..f349118bcbb8 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java @@ -267,7 +267,9 @@ public void processReceivedMessages(List r final ArrayList ackHandlers = new ArrayList<>(responseMessages.size()); for (ReceivedMessage pubsubMessage : responseMessages) { - ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), pubsubMessage.getMessage().getSerializedSize())); + int size = pubsubMessage.getMessage().getSerializedSize(); + AckHandler handler = new AckHandler(pubsubMessage.getAckId(), size); + ackHandlers.add(handler); } Instant now = new Instant(clock.millisTime()); @@ -275,32 +277,31 @@ public void processReceivedMessages(List r logger.log( Level.FINER, "Received {0} messages at {1}", new Object[] {responseMessages.size(), now}); + // We must add the ackHandlers to outstandingAckHandlers before setting up the deadline extension alarm. + // Otherwise, the alarm might go off before we can add the handlers. synchronized (outstandingAckHandlers) { // AckDeadlineAlarm modifies lists in outstandingAckHandlers in-place and might run at any time. // We will also later iterate over ackHandlers when we give messages to user code. // We must create a new list to pass to outstandingAckHandlers, // so that we can't iterate and modify the list concurrently. + ArrayList ackHandlersCopy = new ArrayList<>(ackHandlers); outstandingAckHandlers.add( - new ExtensionJob( - expiration, - INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, - new ArrayList(ackHandlers))); + new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlersCopy)); } - setupNextAckDeadlineExtensionAlarm(expiration); // Deadline extension must be set up before we reserve flow control. // Flow control might block for a while, and extension will keep messages from expiring. + setupNextAckDeadlineExtensionAlarm(expiration); + // Reserving flow control must happen before we give the messages to the user, + // otherwise the user code might be given too many messages to process at once. try { - flowController.reserve(responseMessages.size(), totalMessageSize(responseMessages)); + flowController.reserve(responseMessages.size(), getTotalMessageSize(responseMessages)); } catch (FlowController.FlowControlException e) { throw new IllegalStateException("Flow control unexpected exception", e); } messagesWaiter.incrementPendingMessages(responseMessages.size()); - // Reserving flow control must happen before we give the messages to the user, - // otherwise the user code might be given too many messages to process at once. - Iterator acksIterator = ackHandlers.iterator(); for (ReceivedMessage userMessage : responseMessages) { final PubsubMessage message = userMessage.getMessage(); @@ -328,7 +329,7 @@ public void run() { } } - private static int totalMessageSize(Collection messages) { + private static int getTotalMessageSize(Collection messages) { int total = 0; for (ReceivedMessage message : messages) { total += message.getMessage().getSerializedSize();