diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumer.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumer.java index d659563f9cef..78e4d8fdff4b 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumer.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/AckReplyConsumer.java @@ -25,10 +25,12 @@ public interface AckReplyConsumer { * message again. */ void ack(); - + /** * Signals that the message has not been successfully processed. The service should resend the * message. */ void nack(); + + void abandon(); } diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index cda1eefea4cd..b0b3c5fd27b1 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -134,6 +134,7 @@ private class AckHandler implements ApiFutureCallback { private final int outstandingBytes; private final long receivedTimeMillis; private final Instant totalExpiration; + private boolean extending = true; AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) { this.ackId = ackId; @@ -152,6 +153,7 @@ private void forget() { */ return; } + extending = false; flowController.release(1, outstandingBytes); messagesWaiter.incrementPendingMessages(-1); processOutstandingBatches(); @@ -415,6 +417,11 @@ public void ack() { public void nack() { response.set(AckReply.NACK); } + + @Override + public void abandon() { + ackHandler.forget(); + } }; ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor()); executor.execute( @@ -466,6 +473,9 @@ void extendDeadlines() { Instant extendTo = now.plusSeconds(extendSeconds); for (Map.Entry entry : pendingMessages.entrySet()) { + if (!entry.getValue().extending) { + continue; + } String ackId = entry.getKey(); Instant totalExpiration = entry.getValue().totalExpiration; if (totalExpiration.isAfter(extendTo)) { diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 75296dd89c87..35487174b049 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -148,6 +148,14 @@ public void testNack() throws Exception { assertThat(sentModAcks).contains(ModAckItem.of(TEST_MESSAGE.getAckId(), 0)); } + @Test + public void testAbandon() throws Exception { + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE); + consumers.take().abandon(); + dispatcher.extendDeadlines(); + assertThat(sentModAcks).doesNotContain(TEST_MESSAGE.getAckId()); + } + @Test public void testExtension() throws Exception { dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);