diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index e09cacbb40cc..56895d66b400 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -17,56 +17,49 @@ package com.google.cloud.pubsub.v1; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; import com.google.api.core.BetaApi; -import com.google.api.core.InternalApi; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.ExecutorAsBackgroundResource; import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.core.FixedExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.api.gax.grpc.GrpcStatusCode; -import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiExceptionFactory; import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.NoHeaderProvider; +import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.TransportChannelProvider; -import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub; +import com.google.cloud.pubsub.v1.stub.PublisherStub; +import com.google.cloud.pubsub.v1.stub.PublisherStubSettings; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; -import com.google.pubsub.v1.PublisherGrpc; -import com.google.pubsub.v1.PublisherGrpc.PublisherFutureStub; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; import com.google.pubsub.v1.TopicNames; -import io.grpc.CallCredentials; -import io.grpc.Channel; import io.grpc.Status; -import io.grpc.auth.MoreCallCredentials; import java.io.IOException; -import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.Nullable; import org.threeten.bp.Duration; /** @@ -93,8 +86,6 @@ public class Publisher { private final String topicName; private final BatchingSettings batchingSettings; - private final RetrySettings retrySettings; - private final LongRandom longRandom; private final Lock messagesBatchLock; private List messagesBatch; @@ -102,12 +93,11 @@ public class Publisher { private final AtomicBoolean activeAlarm; - private final Channel channel; - @Nullable private final CallCredentials callCredentials; + private final PublisherStub publisherStub; private final ScheduledExecutorService executor; private final AtomicBoolean shutdown; - private final List closeables = new ArrayList<>(); + private final List closeables; private final MessageWaiter messagesWaiter; private ScheduledFuture currentAlarmFuture; @@ -125,40 +115,43 @@ private Publisher(Builder builder) throws IOException { topicName = builder.topicName; this.batchingSettings = builder.batchingSettings; - this.retrySettings = builder.retrySettings; - this.longRandom = builder.longRandom; messagesBatch = new LinkedList<>(); messagesBatchLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); executor = builder.executorProvider.getExecutor(); if (builder.executorProvider.shouldAutoClose()) { - closeables.add(new ExecutorAsBackgroundResource(executor)); + closeables = + Collections.singletonList(new ExecutorAsBackgroundResource(executor)); + } else { + closeables = Collections.emptyList(); } - TransportChannelProvider channelProvider = builder.channelProvider; - if (channelProvider.needsExecutor()) { - channelProvider = channelProvider.withExecutor(executor); - } - if (channelProvider.needsHeaders()) { - Map headers = - ImmutableMap.builder() - .putAll(builder.headerProvider.getHeaders()) - .putAll(builder.internalHeaderProvider.getHeaders()) - .build(); - channelProvider = channelProvider.withHeaders(headers); - } - if (channelProvider.needsEndpoint()) { - channelProvider = channelProvider.withEndpoint(TopicAdminSettings.getDefaultEndpoint()); - } - GrpcTransportChannel transportChannel = - (GrpcTransportChannel) channelProvider.getTransportChannel(); - channel = transportChannel.getChannel(); - if (channelProvider.shouldAutoClose()) { - closeables.add(transportChannel); + + // Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry. + // We post-process this here to keep backward-compatibility. + RetrySettings retrySettings = builder.retrySettings; + if (retrySettings.getMaxAttempts() == 0) { + retrySettings = retrySettings.toBuilder().setMaxAttempts(Integer.MAX_VALUE).build(); } - Credentials credentials = builder.credentialsProvider.getCredentials(); - callCredentials = credentials == null ? null : MoreCallCredentials.from(credentials); + PublisherStubSettings.Builder stubSettings = + PublisherStubSettings.newBuilder() + .setCredentialsProvider(builder.credentialsProvider) + .setExecutorProvider(FixedExecutorProvider.create(executor)) + .setTransportChannelProvider(builder.channelProvider); + stubSettings + .publishSettings() + .setRetryableCodes( + StatusCode.Code.ABORTED, + StatusCode.Code.CANCELLED, + StatusCode.Code.DEADLINE_EXCEEDED, + StatusCode.Code.INTERNAL, + StatusCode.Code.RESOURCE_EXHAUSTED, + StatusCode.Code.UNKNOWN, + StatusCode.Code.UNAVAILABLE) + .setRetrySettings(retrySettings) + .setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build()); + this.publisherStub = GrpcPublisherStub.create(stubSettings.build()); shutdown = new AtomicBoolean(false); messagesWaiter = new MessageWaiter(); @@ -320,21 +313,9 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { publishRequest.addMessages(outstandingPublish.message); } - long rpcTimeoutMs = - Math.round( - retrySettings.getInitialRpcTimeout().toMillis() - * Math.pow(retrySettings.getRpcTimeoutMultiplier(), outstandingBatch.attempt - 1)); - rpcTimeoutMs = Math.min(rpcTimeoutMs, retrySettings.getMaxRpcTimeout().toMillis()); - - PublisherFutureStub stub = - PublisherGrpc.newFutureStub(channel).withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS); - if (callCredentials != null) { - stub = stub.withCallCredentials(callCredentials); - } - - Futures.addCallback( - stub.publish(publishRequest.build()), - new FutureCallback() { + ApiFutures.addCallback( + publisherStub.publishCallable().futureCall(publishRequest.build()), + new ApiFutureCallback() { @Override public void onSuccess(PublishResponse result) { try { @@ -364,37 +345,13 @@ public void onSuccess(PublishResponse result) { @Override public void onFailure(Throwable t) { - long nextBackoffDelay = - computeNextBackoffDelayMs(outstandingBatch, retrySettings, longRandom); - - if (!isRetryable(t) - || retrySettings.getMaxAttempts() > 0 - && outstandingBatch.getAttempt() > retrySettings.getMaxAttempts() - || System.currentTimeMillis() + nextBackoffDelay - > outstandingBatch.creationTime + retrySettings.getTotalTimeout().toMillis()) { - try { - ApiException gaxException = - ApiExceptionFactory.createException( - t, GrpcStatusCode.of(Status.fromThrowable(t).getCode()), false); - for (OutstandingPublish outstandingPublish : - outstandingBatch.outstandingPublishes) { - outstandingPublish.publishResult.setException(gaxException); - } - } finally { - messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); + try { + for (OutstandingPublish outstandingPublish : outstandingBatch.outstandingPublishes) { + outstandingPublish.publishResult.setException(t); } - return; + } finally { + messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); } - - executor.schedule( - new Runnable() { - @Override - public void run() { - publishOutstandingBatch(outstandingBatch); - } - }, - nextBackoffDelay, - TimeUnit.MILLISECONDS); } }); } @@ -459,43 +416,13 @@ public void shutdown() throws Exception { for (AutoCloseable closeable : closeables) { closeable.close(); } + publisherStub.shutdown(); } private boolean hasBatchingBytes() { return getMaxBatchBytes() > 0; } - private static long computeNextBackoffDelayMs( - OutstandingBatch outstandingBatch, RetrySettings retrySettings, LongRandom longRandom) { - long delayMillis = - Math.round( - retrySettings.getInitialRetryDelay().toMillis() - * Math.pow(retrySettings.getRetryDelayMultiplier(), outstandingBatch.attempt - 1)); - delayMillis = Math.min(retrySettings.getMaxRetryDelay().toMillis(), delayMillis); - outstandingBatch.attempt++; - return longRandom.nextLong(0, delayMillis); - } - - private boolean isRetryable(Throwable t) { - Status status = Status.fromThrowable(t); - switch (status.getCode()) { - case ABORTED: - case CANCELLED: - case DEADLINE_EXCEEDED: - case INTERNAL: - case RESOURCE_EXHAUSTED: - case UNKNOWN: - case UNAVAILABLE: - return true; - default: - return false; - } - } - - interface LongRandom { - long nextLong(long least, long bound); - } - /** * Constructs a new {@link Builder} using the given topic. * @@ -565,13 +492,6 @@ public static final class Builder { .setRpcTimeoutMultiplier(2) .setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT) .build(); - static final LongRandom DEFAULT_LONG_RANDOM = - new LongRandom() { - @Override - public long nextLong(long least, long bound) { - return ThreadLocalRandom.current().nextLong(least, bound); - } - }; private static final int THREADS_PER_CPU = 5; static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER = @@ -585,7 +505,6 @@ public long nextLong(long least, long bound) { BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS; RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS; - LongRandom longRandom = DEFAULT_LONG_RANDOM; TransportChannelProvider channelProvider = TopicAdminSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build(); @@ -673,12 +592,6 @@ public Builder setRetrySettings(RetrySettings retrySettings) { return this; } - @InternalApi - Builder setLongRandom(LongRandom longRandom) { - this.longRandom = Preconditions.checkNotNull(longRandom); - return this; - } - /** Gives the ability to set a custom executor to be used by the library. */ public Builder setExecutorProvider(ExecutorProvider executorProvider) { this.executorProvider = Preconditions.checkNotNull(executorProvider); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeScheduledExecutorService.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeScheduledExecutorService.java index 59879712ab53..13704b27d1b0 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeScheduledExecutorService.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeScheduledExecutorService.java @@ -93,11 +93,11 @@ public void setupScheduleExpectation(Duration delay) { expectedWorkQueue.add(delay); } } - + /** - * Blocks the current thread until all the work - * {@link FakeScheduledExecutorService#setupScheduleExpectation(Duration) expected} has been - * scheduled in the executor. + * Blocks the current thread until all the work {@link + * FakeScheduledExecutorService#setupScheduleExpectation(Duration) expected} has been scheduled in + * the executor. */ public void waitForExpectedWork() { synchronized (expectedWorkQueue) { @@ -224,7 +224,7 @@ ScheduledFuture schedulePendingCallable(PendingCallable callable) { } expectedWorkQueue.notifyAll(); } - + return callable.getScheduledFuture(); } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 79e0015275f7..bea0ec4e9194 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -16,6 +16,7 @@ package com.google.cloud.pubsub.v1; +import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertSame; @@ -28,8 +29,8 @@ import com.google.api.gax.core.FixedExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.api.gax.core.NoCredentialsProvider; -import com.google.api.gax.grpc.GrpcTransportChannel; -import com.google.api.gax.rpc.TransportChannel; +import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.api.gax.rpc.DataLossException; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.pubsub.v1.Publisher.Builder; import com.google.protobuf.ByteString; @@ -39,12 +40,8 @@ import io.grpc.Server; import io.grpc.Status; import io.grpc.StatusException; -import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; -import java.io.IOException; -import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledExecutorService; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -61,53 +58,7 @@ public class PublisherImplTest { InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build(); private static final TransportChannelProvider TEST_CHANNEL_PROVIDER = - new TransportChannelProvider() { - @Override - public boolean shouldAutoClose() { - return true; - } - - @Override - public boolean needsExecutor() { - return false; - } - - @Override - public TransportChannelProvider withExecutor(ScheduledExecutorService executor) { - return null; - } - - @Override - public boolean needsHeaders() { - return false; - } - - @Override - public TransportChannelProvider withHeaders(Map headers) { - return null; - } - - @Override - public boolean needsEndpoint() { - return false; - } - - @Override - public TransportChannelProvider withEndpoint(String endpoint) { - return null; - } - - @Override - public TransportChannel getTransportChannel() throws IOException { - return GrpcTransportChannel.create(InProcessChannelBuilder.forName("test-server").build()); - } - - @Override - public String getTransportName() { - return null; - } - - }; + LocalChannelProvider.create("test-server"); private FakeScheduledExecutorService fakeExecutor; @@ -284,6 +235,27 @@ private ApiFuture sendTestMessage(Publisher publisher, String data) { PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(data)).build()); } + @Test + public void testErrorPropagation() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setDelayThreshold(Duration.ofSeconds(5)) + .build()) + .build(); + testPublisherServiceImpl.addPublishError(Status.DATA_LOSS.asException()); + try { + sendTestMessage(publisher, "A").get(); + fail("should throw exception"); + } catch (ExecutionException e) { + assertThat(e.getCause()).isInstanceOf(DataLossException.class); + } + } + @Test public void testPublishFailureRetries() throws Exception { Publisher publisher = @@ -383,41 +355,6 @@ public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception publisher.shutdown(); } - public void testPublishFailureRetries_exceededsRetryDuration() throws Exception { - Publisher publisher = - getTestPublisherBuilder() - .setExecutorProvider(SINGLE_THREAD_EXECUTOR) - .setRetrySettings( - Publisher.Builder.DEFAULT_RETRY_SETTINGS - .toBuilder() - .setTotalTimeout(Duration.ofSeconds(10)) - .build()) - .setBatchingSettings( - Publisher.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(1L) - .setDelayThreshold(Duration.ofSeconds(5)) - .build()) - .build(); // To demonstrate that reaching duration will trigger publish - - // With exponential backoff, starting at 5ms we should have no more than 11 retries - for (int i = 0; i < 11; ++i) { - testPublisherServiceImpl.addPublishError(new FakeException()); - } - ApiFuture publishFuture1 = sendTestMessage(publisher, "A"); - - try { - publishFuture1.get(); - } catch (ExecutionException e) { - if (!(e.getCause() instanceof FakeException)) { - throw new IllegalStateException("unexpected exception", e); - } - } finally { - assertTrue(testPublisherServiceImpl.getCapturedRequests().size() >= 10); - publisher.shutdown(); - } - } - @Test(expected = ExecutionException.class) public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exception { Publisher publisher = @@ -627,13 +564,6 @@ private Builder getTestPublisherBuilder() { return Publisher.newBuilder(TEST_TOPIC) .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) .setChannelProvider(TEST_CHANNEL_PROVIDER) - .setCredentialsProvider(NoCredentialsProvider.create()) - .setLongRandom( - new Publisher.LongRandom() { - @Override - public long nextLong(long least, long bound) { - return bound - 1; - } - }); + .setCredentialsProvider(NoCredentialsProvider.create()); } }