diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index 207700622fb4..3e45fce3eb11 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -702,9 +702,10 @@ interface MessageConsumer extends AutoCloseable { /** * Pulls messages from the provided subscription. This method possibly returns no messages if no * message was available at the time the request was processed by the Pub/Sub service (i.e. the - * system is not allowed to wait until at least one message is available). Pulled messages have - * their acknowledge deadline automatically renewed until they are explicitly consumed using - * {@link Iterator#next()}. + * system is not allowed to wait until at least one message is available - + * return_immediately + * option is set to {@code true}). Pulled messages have their acknowledge deadline automatically + * renewed until they are explicitly consumed using {@link Iterator#next()}. * *
Example of pulling a maximum number of messages from a subscription. *
{@code
@@ -728,9 +729,12 @@ interface MessageConsumer extends AutoCloseable {
/**
* Sends a request for pulling messages from the provided subscription. This method returns a
* {@code Future} object to consume the result. {@link Future#get()} returns a message iterator.
- * This method possibly returns no messages if no message was available at the time the request
- * was processed by the Pub/Sub service (i.e. the system is not allowed to wait until at least one
- * message is available).
+ * When using this method the system is allowed to wait until at least one message is available
+ * rather than returning no messages (i.e.
+ * return_immediately
+ * option is set to {@code false}). The client may cancel the request by calling
+ * {@link Future#cancel(boolean)} if it does not wish to wait any longer. Notice that the Pub/Sub
+ * service might still return no messages if a timeout is reached on the service side.
*
* Example of asynchronously pulling a maximum number of messages from a subscription.
*
{@code
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
index 494bb3ff43a1..369d87935cb6 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java
@@ -512,18 +512,13 @@ public Future> listSubscriptionsAsync(String topic,
return listSubscriptionsAsync(topic, getOptions(), optionMap(options));
}
- @Override
- public Iterator pull(String subscription, int maxMessages) {
- return get(pullAsync(subscription, maxMessages));
- }
-
- @Override
- public Future> pullAsync(final String subscription, int maxMessages) {
- PullRequest request = PullRequest.newBuilder().setReturnImmediately(true)
+ private Future> pullAsync(final String subscription,
+ int maxMessages, boolean returnImmediately) {
+ PullRequest request = PullRequest.newBuilder()
.setSubscription(
SubscriberApi.formatSubscriptionName(getOptions().getProjectId(), subscription))
.setMaxMessages(maxMessages)
- .setReturnImmediately(true)
+ .setReturnImmediately(returnImmediately)
.build();
PullFuture future = rpc.pull(request);
future.addCallback(new PubSubRpc.PullCallback() {
@@ -555,6 +550,16 @@ public ReceivedMessage apply(com.google.pubsub.v1.ReceivedMessage receivedMessag
});
}
+ @Override
+ public Iterator pull(String subscription, int maxMessages) {
+ return get(pullAsync(subscription, maxMessages, true));
+ }
+
+ @Override
+ public Future> pullAsync(String subscription, int maxMessages) {
+ return pullAsync(subscription, maxMessages, false);
+ }
+
@Override
public MessageConsumer pullAsync(String subscription, MessageProcessor callback,
PullOption... options) {
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java
index 76f321d0ebf9..bbdaab1bd1b2 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java
@@ -68,6 +68,8 @@
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
+import org.joda.time.Duration;
+
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.Future;
@@ -77,6 +79,7 @@ public class DefaultPubSubRpc implements PubSubRpc {
private final PublisherApi publisherApi;
private final SubscriberApi subscriberApi;
+ private final SubscriberApi noTimeoutSubscriberApi;
private final ScheduledExecutorService executor;
private final ProviderManager providerManager;
private final ExecutorFactory executorFactory;
@@ -164,6 +167,12 @@ public DefaultPubSubRpc(PubSubOptions options) throws IOException {
.applyToAllApiMethods(callSettingsBuilder);
publisherApi = PublisherApi.create(pubBuilder.build());
subscriberApi = SubscriberApi.create(subBuilder.build());
+ callSettingsBuilder.setRetrySettingsBuilder(callSettingsBuilder.getRetrySettingsBuilder()
+ .setTotalTimeout(Duration.millis(Long.MAX_VALUE))
+ .setInitialRpcTimeout(Duration.millis(Long.MAX_VALUE))
+ .setMaxRpcTimeout(Duration.millis(Long.MAX_VALUE)));
+ subBuilder.applyToAllApiMethods(callSettingsBuilder);
+ noTimeoutSubscriberApi = SubscriberApi.create(subBuilder.build());
} catch (Exception ex) {
throw new IOException(ex);
}
@@ -256,9 +265,14 @@ public Future acknowledge(AcknowledgeRequest request) {
return translate(subscriberApi.acknowledgeCallable().futureCall(request), false);
}
+ private static PullFuture pull(SubscriberApi subscriberApi, PullRequest request) {
+ return new PullFutureImpl(translate(subscriberApi.pullCallable().futureCall(request), false));
+ }
+
@Override
public PullFuture pull(PullRequest request) {
- return new PullFutureImpl(translate(subscriberApi.pullCallable().futureCall(request), false));
+ return request.getReturnImmediately()
+ ? pull(subscriberApi, request) : pull(noTimeoutSubscriberApi, request);
}
@Override
@@ -290,6 +304,7 @@ public void close() throws Exception {
}
closed = true;
subscriberApi.close();
+ noTimeoutSubscriberApi.close();
publisherApi.close();
providerManager.getChannel().shutdown();
executorFactory.release(executor);
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java
index b30d7a0f8c7b..d9524af5925e 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java
@@ -549,6 +549,24 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept
assertTrue(pubsub().deleteTopic(topic));
}
+ @Test
+ public void testPullMessagesAsyncNonImmediately() throws ExecutionException, InterruptedException {
+ String topic = formatForTest("test-pull-messages-async-non-immediately-topic");
+ pubsub().create(TopicInfo.of(topic));
+ String subscription = formatForTest("test-pull-messages-async-subscription");
+ pubsub().create(SubscriptionInfo.of(topic, subscription));
+ Future> future = pubsub().pullAsync(subscription, 2);
+ Message message1 = Message.of("payload1");
+ Message message2 = Message.of("payload2");
+ List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2));
+ assertEquals(2, messageIds.size());
+ Iterator iterator = future.get();
+ assertEquals(message1.getPayloadAsString(), iterator.next().getPayloadAsString());
+ assertEquals(message2.getPayloadAsString(), iterator.next().getPayloadAsString());
+ assertTrue(pubsub().deleteSubscription(subscription));
+ assertTrue(pubsub().deleteTopic(topic));
+ }
+
@Test
public void testPullAsyncNonExistingSubscription()
throws ExecutionException, InterruptedException {
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java
index 8b5c32bbceae..b3fa6d5178fd 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java
@@ -1302,7 +1302,7 @@ public void testPullMessagesAsync() throws ExecutionException, InterruptedExcept
PullRequest request = PullRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.setMaxMessages(42)
- .setReturnImmediately(true)
+ .setReturnImmediately(false)
.build();
List messageList = ImmutableList.of(
ReceivedMessage.fromPb(pubsub, SUBSCRIPTION, MESSAGE_PB1),
@@ -1363,7 +1363,7 @@ public void testPullMessagesAsyncError() throws ExecutionException, InterruptedE
PullRequest request = PullRequest.newBuilder()
.setSubscription(SUBSCRIPTION_NAME_PB)
.setMaxMessages(42)
- .setReturnImmediately(true)
+ .setReturnImmediately(false)
.build();
PubSubException exception = new PubSubException(new IOException(), false);
PullFuture futureMock = EasyMock.createStrictMock(PullFuture.class);