diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index d6837422cb8e..b9854fe5f683 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -30,12 +30,16 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; +import com.google.protobuf.Empty; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.ModifyAckDeadlineRequest; import com.google.pubsub.v1.StreamingPullRequest; import com.google.pubsub.v1.StreamingPullResponse; import com.google.pubsub.v1.SubscriberGrpc.SubscriberStub; import io.grpc.Status; import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; +import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.Deque; import java.util.List; @@ -56,9 +60,10 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration.ofMillis(100); private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10); - private static final int MAX_PER_REQUEST_CHANGES = 10000; + private static final int MAX_PER_REQUEST_CHANGES = 1000; + private static final Duration UNARY_TIMEOUT = Duration.ofSeconds(60); - private final SubscriberStub asyncStub; + private final SubscriberStub stub; private final String subscription; private final ScheduledExecutorService systemExecutor; private final MessageDispatcher messageDispatcher; @@ -75,7 +80,7 @@ public StreamingSubscriberConnection( Duration ackExpirationPadding, Duration maxAckExtensionPeriod, Distribution ackLatencyDistribution, - SubscriberStub asyncStub, + SubscriberStub stub, FlowController flowController, Deque outstandingMessageBatches, ScheduledExecutorService executor, @@ -83,7 +88,7 @@ public StreamingSubscriberConnection( ApiClock clock) { this.subscription = subscription; this.systemExecutor = systemExecutor; - this.asyncStub = asyncStub; + this.stub = stub; this.messageDispatcher = new MessageDispatcher( receiver, @@ -185,8 +190,7 @@ private void initialize() { final ClientResponseObserver responseObserver = new StreamingPullResponseObserver(errorFuture); final ClientCallStreamObserver requestObserver = - (ClientCallStreamObserver) - (asyncStub.streamingPull(responseObserver)); + (ClientCallStreamObserver) (stub.streamingPull(responseObserver)); logger.log( Level.FINER, "Initializing stream to subscription {0}",subscription); @@ -260,24 +264,52 @@ public void run() { } private boolean isAlive() { - return state() == State.RUNNING || state() == State.STARTING; + State state = state(); // Read the state only once. + return state == State.RUNNING || state == State.STARTING; } @Override public void sendAckOperations( List acksToSend, List ackDeadlineExtensions) { - List requests = - partitionAckOperations(acksToSend, ackDeadlineExtensions, MAX_PER_REQUEST_CHANGES); - lock.lock(); - try { - for (StreamingPullRequest request : requests) { - requestObserver.onNext(request); + SubscriberStub timeoutStub = + stub.withDeadlineAfter(UNARY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + StreamObserver loggingObserver = new StreamObserver() { + @Override + public void onCompleted() { + // noop } - } catch (Exception e) { - Level level = isAlive() ? Level.WARNING : Level.FINER; - logger.log(level, "failed to send ack operations", e); - } finally { - lock.unlock(); + + @Override + public void onNext(Empty e) { + // noop + } + + @Override + public void onError(Throwable t) { + Level level = isAlive() ? Level.WARNING : Level.FINER; + logger.log(level, "failed to send operations", t); + } + }; + + for (PendingModifyAckDeadline modack : ackDeadlineExtensions) { + for (List idChunk : Lists.partition(modack.ackIds, MAX_PER_REQUEST_CHANGES)) { + timeoutStub.modifyAckDeadline( + ModifyAckDeadlineRequest.newBuilder() + .setSubscription(subscription) + .addAllAckIds(idChunk) + .setAckDeadlineSeconds(modack.deadlineExtensionSeconds) + .build(), + loggingObserver); + } + } + + for (List idChunk : Lists.partition(acksToSend, MAX_PER_REQUEST_CHANGES)) { + timeoutStub.acknowledge( + AcknowledgeRequest.newBuilder() + .setSubscription(subscription) + .addAllAckIds(idChunk) + .build(), + loggingObserver); } } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java index 200e239236ec..f02606db4321 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java @@ -168,7 +168,7 @@ public void failed(Subscriber.State from, Throwable failure) { } private MessageAndConsumer pollQueue(BlockingQueue queue) throws InterruptedException { - Object obj = queue.poll(1, TimeUnit.MINUTES); + Object obj = queue.poll(10, TimeUnit.MINUTES); if (obj == null) { return null; }