Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,56 +17,49 @@
package com.google.cloud.pubsub.v1;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorAsBackgroundResource;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptionFactory;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.NoHeaderProvider;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub;
import com.google.cloud.pubsub.v1.stub.PublisherStub;
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PublisherGrpc;
import com.google.pubsub.v1.PublisherGrpc.PublisherFutureStub;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.TopicNames;
import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.auth.MoreCallCredentials;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/**
Expand All @@ -93,21 +86,18 @@ public class Publisher {
private final String topicName;

private final BatchingSettings batchingSettings;
private final RetrySettings retrySettings;
private final LongRandom longRandom;

private final Lock messagesBatchLock;
private List<OutstandingPublish> messagesBatch;
private int batchedBytes;

private final AtomicBoolean activeAlarm;

private final Channel channel;
@Nullable private final CallCredentials callCredentials;
private final PublisherStub publisherStub;

private final ScheduledExecutorService executor;
private final AtomicBoolean shutdown;
private final List<AutoCloseable> closeables = new ArrayList<>();
private final List<AutoCloseable> closeables;
private final MessageWaiter messagesWaiter;
private ScheduledFuture<?> currentAlarmFuture;

Expand All @@ -125,40 +115,43 @@ private Publisher(Builder builder) throws IOException {
topicName = builder.topicName;

this.batchingSettings = builder.batchingSettings;
this.retrySettings = builder.retrySettings;
this.longRandom = builder.longRandom;

messagesBatch = new LinkedList<>();
messagesBatchLock = new ReentrantLock();
activeAlarm = new AtomicBoolean(false);
executor = builder.executorProvider.getExecutor();
if (builder.executorProvider.shouldAutoClose()) {
closeables.add(new ExecutorAsBackgroundResource(executor));
closeables =
Collections.<AutoCloseable>singletonList(new ExecutorAsBackgroundResource(executor));
} else {
closeables = Collections.emptyList();
}
TransportChannelProvider channelProvider = builder.channelProvider;
if (channelProvider.needsExecutor()) {
channelProvider = channelProvider.withExecutor(executor);
}
if (channelProvider.needsHeaders()) {
Map<String, String> headers =
ImmutableMap.<String, String>builder()
.putAll(builder.headerProvider.getHeaders())
.putAll(builder.internalHeaderProvider.getHeaders())
.build();
channelProvider = channelProvider.withHeaders(headers);
}
if (channelProvider.needsEndpoint()) {
channelProvider = channelProvider.withEndpoint(TopicAdminSettings.getDefaultEndpoint());
}
GrpcTransportChannel transportChannel =
(GrpcTransportChannel) channelProvider.getTransportChannel();
channel = transportChannel.getChannel();
if (channelProvider.shouldAutoClose()) {
closeables.add(transportChannel);

// Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
// We post-process this here to keep backward-compatibility.
RetrySettings retrySettings = builder.retrySettings;
if (retrySettings.getMaxAttempts() == 0) {
retrySettings = retrySettings.toBuilder().setMaxAttempts(Integer.MAX_VALUE).build();
}

Credentials credentials = builder.credentialsProvider.getCredentials();
callCredentials = credentials == null ? null : MoreCallCredentials.from(credentials);
PublisherStubSettings.Builder stubSettings =
PublisherStubSettings.newBuilder()
.setCredentialsProvider(builder.credentialsProvider)
.setExecutorProvider(FixedExecutorProvider.create(executor))
.setTransportChannelProvider(builder.channelProvider);
stubSettings
.publishSettings()
.setRetryableCodes(
StatusCode.Code.ABORTED,
StatusCode.Code.CANCELLED,
StatusCode.Code.DEADLINE_EXCEEDED,
StatusCode.Code.INTERNAL,
StatusCode.Code.RESOURCE_EXHAUSTED,
StatusCode.Code.UNKNOWN,
StatusCode.Code.UNAVAILABLE)
.setRetrySettings(retrySettings)
.setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build());
this.publisherStub = GrpcPublisherStub.create(stubSettings.build());

shutdown = new AtomicBoolean(false);
messagesWaiter = new MessageWaiter();
Expand Down Expand Up @@ -320,21 +313,9 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
publishRequest.addMessages(outstandingPublish.message);
}

long rpcTimeoutMs =
Math.round(
retrySettings.getInitialRpcTimeout().toMillis()
* Math.pow(retrySettings.getRpcTimeoutMultiplier(), outstandingBatch.attempt - 1));
rpcTimeoutMs = Math.min(rpcTimeoutMs, retrySettings.getMaxRpcTimeout().toMillis());

PublisherFutureStub stub =
PublisherGrpc.newFutureStub(channel).withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS);
if (callCredentials != null) {
stub = stub.withCallCredentials(callCredentials);
}

Futures.addCallback(
stub.publish(publishRequest.build()),
new FutureCallback<PublishResponse>() {
ApiFutures.addCallback(
publisherStub.publishCallable().futureCall(publishRequest.build()),
new ApiFutureCallback<PublishResponse>() {
@Override
public void onSuccess(PublishResponse result) {
try {
Expand Down Expand Up @@ -364,37 +345,13 @@ public void onSuccess(PublishResponse result) {

@Override
public void onFailure(Throwable t) {
long nextBackoffDelay =
computeNextBackoffDelayMs(outstandingBatch, retrySettings, longRandom);

if (!isRetryable(t)
|| retrySettings.getMaxAttempts() > 0
&& outstandingBatch.getAttempt() > retrySettings.getMaxAttempts()
|| System.currentTimeMillis() + nextBackoffDelay
> outstandingBatch.creationTime + retrySettings.getTotalTimeout().toMillis()) {
try {
ApiException gaxException =
ApiExceptionFactory.createException(
t, GrpcStatusCode.of(Status.fromThrowable(t).getCode()), false);
for (OutstandingPublish outstandingPublish :
outstandingBatch.outstandingPublishes) {
outstandingPublish.publishResult.setException(gaxException);
}
} finally {
messagesWaiter.incrementPendingMessages(-outstandingBatch.size());
try {
for (OutstandingPublish outstandingPublish : outstandingBatch.outstandingPublishes) {
outstandingPublish.publishResult.setException(t);
}
return;
} finally {
messagesWaiter.incrementPendingMessages(-outstandingBatch.size());
}

executor.schedule(
new Runnable() {
@Override
public void run() {
publishOutstandingBatch(outstandingBatch);
}
},
nextBackoffDelay,
TimeUnit.MILLISECONDS);
}
});
}
Expand Down Expand Up @@ -459,43 +416,13 @@ public void shutdown() throws Exception {
for (AutoCloseable closeable : closeables) {
closeable.close();
}
publisherStub.shutdown();
}

private boolean hasBatchingBytes() {
return getMaxBatchBytes() > 0;
}

private static long computeNextBackoffDelayMs(
OutstandingBatch outstandingBatch, RetrySettings retrySettings, LongRandom longRandom) {
long delayMillis =
Math.round(
retrySettings.getInitialRetryDelay().toMillis()
* Math.pow(retrySettings.getRetryDelayMultiplier(), outstandingBatch.attempt - 1));
delayMillis = Math.min(retrySettings.getMaxRetryDelay().toMillis(), delayMillis);
outstandingBatch.attempt++;
return longRandom.nextLong(0, delayMillis);
}

private boolean isRetryable(Throwable t) {
Status status = Status.fromThrowable(t);
switch (status.getCode()) {
case ABORTED:
case CANCELLED:
case DEADLINE_EXCEEDED:
case INTERNAL:
case RESOURCE_EXHAUSTED:
case UNKNOWN:
case UNAVAILABLE:
return true;
default:
return false;
}
}

interface LongRandom {
long nextLong(long least, long bound);
}

/**
* Constructs a new {@link Builder} using the given topic.
*
Expand Down Expand Up @@ -565,13 +492,6 @@ public static final class Builder {
.setRpcTimeoutMultiplier(2)
.setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT)
.build();
static final LongRandom DEFAULT_LONG_RANDOM =
new LongRandom() {
@Override
public long nextLong(long least, long bound) {
return ThreadLocalRandom.current().nextLong(least, bound);
}
};

private static final int THREADS_PER_CPU = 5;
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
Expand All @@ -585,7 +505,6 @@ public long nextLong(long least, long bound) {
BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS;

RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS;
LongRandom longRandom = DEFAULT_LONG_RANDOM;

TransportChannelProvider channelProvider =
TopicAdminSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();
Expand Down Expand Up @@ -673,12 +592,6 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
return this;
}

@InternalApi
Builder setLongRandom(LongRandom longRandom) {
this.longRandom = Preconditions.checkNotNull(longRandom);
return this;
}

/** Gives the ability to set a custom executor to be used by the library. */
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = Preconditions.checkNotNull(executorProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ public void setupScheduleExpectation(Duration delay) {
expectedWorkQueue.add(delay);
}
}

/**
* Blocks the current thread until all the work
* {@link FakeScheduledExecutorService#setupScheduleExpectation(Duration) expected} has been
* scheduled in the executor.
* Blocks the current thread until all the work {@link
* FakeScheduledExecutorService#setupScheduleExpectation(Duration) expected} has been scheduled in
* the executor.
*/
public void waitForExpectedWork() {
synchronized (expectedWorkQueue) {
Expand Down Expand Up @@ -224,7 +224,7 @@ <V> ScheduledFuture<V> schedulePendingCallable(PendingCallable<V> callable) {
}
expectedWorkQueue.notifyAll();
}

return callable.getScheduledFuture();
}

Expand Down
Loading