From 20a5bccc57771cf2a08fa36cf1fbda7b83270012 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Wed, 4 Apr 2018 12:09:17 -0700 Subject: [PATCH] pubsub: use unary calls to send acks/modacks Previously we send ack/modack requests on the StreamingPull streams. However, if pull responses are buffered, gRPC cannot promptly tell us that the stream is broken, so we'll keep sending requests on the dead stream. This commit implements a temporary solution. We send requests by unary RPCs instead and only use streaming for receiving messages. Load test shows no regression. However, modacks seem to take longer to take effect. The timeout for nack test had to be increased. --- .../v1/StreamingSubscriberConnection.java | 68 ++++++++++++++----- .../google/cloud/pubsub/it/ITPubSubTest.java | 2 +- 2 files changed, 51 insertions(+), 19 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index d6837422cb8e..b9854fe5f683 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -30,12 +30,16 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; +import com.google.protobuf.Empty; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.ModifyAckDeadlineRequest; import com.google.pubsub.v1.StreamingPullRequest; import com.google.pubsub.v1.StreamingPullResponse; import com.google.pubsub.v1.SubscriberGrpc.SubscriberStub; import io.grpc.Status; import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientResponseObserver; +import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.Deque; import java.util.List; @@ -56,9 +60,10 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration.ofMillis(100); private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10); - private static final int MAX_PER_REQUEST_CHANGES = 10000; + private static final int MAX_PER_REQUEST_CHANGES = 1000; + private static final Duration UNARY_TIMEOUT = Duration.ofSeconds(60); - private final SubscriberStub asyncStub; + private final SubscriberStub stub; private final String subscription; private final ScheduledExecutorService systemExecutor; private final MessageDispatcher messageDispatcher; @@ -75,7 +80,7 @@ public StreamingSubscriberConnection( Duration ackExpirationPadding, Duration maxAckExtensionPeriod, Distribution ackLatencyDistribution, - SubscriberStub asyncStub, + SubscriberStub stub, FlowController flowController, Deque outstandingMessageBatches, ScheduledExecutorService executor, @@ -83,7 +88,7 @@ public StreamingSubscriberConnection( ApiClock clock) { this.subscription = subscription; this.systemExecutor = systemExecutor; - this.asyncStub = asyncStub; + this.stub = stub; this.messageDispatcher = new MessageDispatcher( receiver, @@ -185,8 +190,7 @@ private void initialize() { final ClientResponseObserver responseObserver = new StreamingPullResponseObserver(errorFuture); final ClientCallStreamObserver requestObserver = - (ClientCallStreamObserver) - (asyncStub.streamingPull(responseObserver)); + (ClientCallStreamObserver) (stub.streamingPull(responseObserver)); logger.log( Level.FINER, "Initializing stream to subscription {0}",subscription); @@ -260,24 +264,52 @@ public void run() { } private boolean isAlive() { - return state() == State.RUNNING || state() == State.STARTING; + State state = state(); // Read the state only once. + return state == State.RUNNING || state == State.STARTING; } @Override public void sendAckOperations( List acksToSend, List ackDeadlineExtensions) { - List requests = - partitionAckOperations(acksToSend, ackDeadlineExtensions, MAX_PER_REQUEST_CHANGES); - lock.lock(); - try { - for (StreamingPullRequest request : requests) { - requestObserver.onNext(request); + SubscriberStub timeoutStub = + stub.withDeadlineAfter(UNARY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + StreamObserver loggingObserver = new StreamObserver() { + @Override + public void onCompleted() { + // noop } - } catch (Exception e) { - Level level = isAlive() ? Level.WARNING : Level.FINER; - logger.log(level, "failed to send ack operations", e); - } finally { - lock.unlock(); + + @Override + public void onNext(Empty e) { + // noop + } + + @Override + public void onError(Throwable t) { + Level level = isAlive() ? Level.WARNING : Level.FINER; + logger.log(level, "failed to send operations", t); + } + }; + + for (PendingModifyAckDeadline modack : ackDeadlineExtensions) { + for (List idChunk : Lists.partition(modack.ackIds, MAX_PER_REQUEST_CHANGES)) { + timeoutStub.modifyAckDeadline( + ModifyAckDeadlineRequest.newBuilder() + .setSubscription(subscription) + .addAllAckIds(idChunk) + .setAckDeadlineSeconds(modack.deadlineExtensionSeconds) + .build(), + loggingObserver); + } + } + + for (List idChunk : Lists.partition(acksToSend, MAX_PER_REQUEST_CHANGES)) { + timeoutStub.acknowledge( + AcknowledgeRequest.newBuilder() + .setSubscription(subscription) + .addAllAckIds(idChunk) + .build(), + loggingObserver); } } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java index 200e239236ec..f02606db4321 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/it/ITPubSubTest.java @@ -168,7 +168,7 @@ public void failed(Subscriber.State from, Throwable failure) { } private MessageAndConsumer pollQueue(BlockingQueue queue) throws InterruptedException { - Object obj = queue.poll(1, TimeUnit.MINUTES); + Object obj = queue.poll(10, TimeUnit.MINUTES); if (obj == null) { return null; }