From 809cddc0a47b00cc2bab51d4a56b642064e8b49f Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Wed, 7 Mar 2018 18:32:19 +1100 Subject: [PATCH 1/5] pubsub: make Publisher use GAPIC client --- .../com/google/cloud/pubsub/v1/Publisher.java | 183 +++++------------- .../cloud/pubsub/v1/TopicAdminSettings.java | 5 +- .../pubsub/v1/stub/GrpcPublisherStub.java | 2 +- .../pubsub/v1/stub/PublisherStubSettings.java | 44 +++-- .../v1/FakeScheduledExecutorService.java | 10 +- .../cloud/pubsub/v1/PublisherImplTest.java | 114 +---------- 6 files changed, 88 insertions(+), 270 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 4db17723937e..b5134a0d7683 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -17,57 +17,46 @@ package com.google.cloud.pubsub.v1; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; import com.google.api.core.BetaApi; -import com.google.api.core.InternalApi; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.ExecutorAsBackgroundResource; import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.core.FixedExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.api.gax.grpc.GrpcStatusCode; -import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiExceptionFactory; import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.NoHeaderProvider; +import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.TransportChannelProvider; -import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; -import com.google.pubsub.v1.PublisherGrpc; -import com.google.pubsub.v1.PublisherGrpc.PublisherFutureStub; import com.google.pubsub.v1.PubsubMessage; -import io.grpc.CallCredentials; -import io.grpc.Channel; import io.grpc.Status; -import io.grpc.auth.MoreCallCredentials; -import org.threeten.bp.Duration; - -import javax.annotation.Nullable; import java.io.IOException; -import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; +import org.threeten.bp.Duration; /** * A Cloud Pub/Sub publisher, that is @@ -94,8 +83,6 @@ public class Publisher { private final String cachedTopicNameString; private final BatchingSettings batchingSettings; - private final RetrySettings retrySettings; - private final LongRandom longRandom; private final Lock messagesBatchLock; private List messagesBatch; @@ -103,12 +90,11 @@ public class Publisher { private final AtomicBoolean activeAlarm; - private final Channel channel; - @Nullable private final CallCredentials callCredentials; + private final TopicAdminClient topicClient; private final ScheduledExecutorService executor; private final AtomicBoolean shutdown; - private final List closeables = new ArrayList<>(); + private final List closeables; private final MessageWaiter messagesWaiter; private ScheduledFuture currentAlarmFuture; @@ -127,40 +113,42 @@ private Publisher(Builder builder) throws IOException { cachedTopicNameString = topicName.toString(); this.batchingSettings = builder.batchingSettings; - this.retrySettings = builder.retrySettings; - this.longRandom = builder.longRandom; messagesBatch = new LinkedList<>(); messagesBatchLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); executor = builder.executorProvider.getExecutor(); if (builder.executorProvider.shouldAutoClose()) { - closeables.add(new ExecutorAsBackgroundResource(executor)); - } - TransportChannelProvider channelProvider = builder.channelProvider; - if (channelProvider.needsExecutor()) { - channelProvider = channelProvider.withExecutor(executor); + closeables = + Collections.singletonList(new ExecutorAsBackgroundResource(executor)); + } else { + closeables = Collections.emptyList(); } - if (channelProvider.needsHeaders()) { - Map headers = - ImmutableMap.builder() - .putAll(builder.headerProvider.getHeaders()) - .putAll(builder.internalHeaderProvider.getHeaders()) - .build(); - channelProvider = channelProvider.withHeaders(headers); - } - if (channelProvider.needsEndpoint()) { - channelProvider = channelProvider.withEndpoint(TopicAdminSettings.getDefaultEndpoint()); - } - GrpcTransportChannel transportChannel = - (GrpcTransportChannel) channelProvider.getTransportChannel(); - channel = transportChannel.getChannel(); - if (channelProvider.shouldAutoClose()) { - closeables.add(transportChannel); + + // Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry. + // We post-process this here to keep backward-compatibility. + RetrySettings retrySettings = builder.retrySettings; + if (retrySettings.getMaxAttempts() == 0) { + retrySettings = retrySettings.toBuilder().setMaxAttempts(Integer.MAX_VALUE).build(); } - Credentials credentials = builder.credentialsProvider.getCredentials(); - callCredentials = credentials == null ? null : MoreCallCredentials.from(credentials); + TopicAdminSettings.Builder topicSettings = + TopicAdminSettings.newBuilder() + .setCredentialsProvider(builder.credentialsProvider) + .setExecutorProvider(FixedExecutorProvider.create(executor)) + .setTransportChannelProvider(builder.channelProvider); + topicSettings + .publishSettings() + .setRetryableCodes( + StatusCode.Code.ABORTED, + StatusCode.Code.CANCELLED, + StatusCode.Code.DEADLINE_EXCEEDED, + StatusCode.Code.INTERNAL, + StatusCode.Code.RESOURCE_EXHAUSTED, + StatusCode.Code.UNKNOWN, + StatusCode.Code.UNAVAILABLE) + .setRetrySettings(retrySettings); + this.topicClient = TopicAdminClient.create(topicSettings.build()); shutdown = new AtomicBoolean(false); messagesWaiter = new MessageWaiter(); @@ -317,21 +305,9 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { publishRequest.addMessages(outstandingPublish.message); } - long rpcTimeoutMs = - Math.round( - retrySettings.getInitialRpcTimeout().toMillis() - * Math.pow(retrySettings.getRpcTimeoutMultiplier(), outstandingBatch.attempt - 1)); - rpcTimeoutMs = Math.min(rpcTimeoutMs, retrySettings.getMaxRpcTimeout().toMillis()); - - PublisherFutureStub stub = - PublisherGrpc.newFutureStub(channel).withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS); - if (callCredentials != null) { - stub = stub.withCallCredentials(callCredentials); - } - - Futures.addCallback( - stub.publish(publishRequest.build()), - new FutureCallback() { + ApiFutures.addCallback( + topicClient.publishCallable().futureCall(publishRequest.build()), + new ApiFutureCallback() { @Override public void onSuccess(PublishResponse result) { try { @@ -361,37 +337,16 @@ public void onSuccess(PublishResponse result) { @Override public void onFailure(Throwable t) { - long nextBackoffDelay = - computeNextBackoffDelayMs(outstandingBatch, retrySettings, longRandom); - - if (!isRetryable(t) - || retrySettings.getMaxAttempts() > 0 - && outstandingBatch.getAttempt() > retrySettings.getMaxAttempts() - || System.currentTimeMillis() + nextBackoffDelay - > outstandingBatch.creationTime + retrySettings.getTotalTimeout().toMillis()) { - try { - ApiException gaxException = - ApiExceptionFactory.createException( - t, GrpcStatusCode.of(Status.fromThrowable(t).getCode()), false); - for (OutstandingPublish outstandingPublish : - outstandingBatch.outstandingPublishes) { - outstandingPublish.publishResult.setException(gaxException); - } - } finally { - messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); + try { + ApiException gaxException = + ApiExceptionFactory.createException( + t, GrpcStatusCode.of(Status.fromThrowable(t).getCode()), false); + for (OutstandingPublish outstandingPublish : outstandingBatch.outstandingPublishes) { + outstandingPublish.publishResult.setException(gaxException); } - return; + } finally { + messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); } - - executor.schedule( - new Runnable() { - @Override - public void run() { - publishOutstandingBatch(outstandingBatch); - } - }, - nextBackoffDelay, - TimeUnit.MILLISECONDS); } }); } @@ -456,43 +411,13 @@ public void shutdown() throws Exception { for (AutoCloseable closeable : closeables) { closeable.close(); } + topicClient.shutdown(); } private boolean hasBatchingBytes() { return getMaxBatchBytes() > 0; } - private static long computeNextBackoffDelayMs( - OutstandingBatch outstandingBatch, RetrySettings retrySettings, LongRandom longRandom) { - long delayMillis = - Math.round( - retrySettings.getInitialRetryDelay().toMillis() - * Math.pow(retrySettings.getRetryDelayMultiplier(), outstandingBatch.attempt - 1)); - delayMillis = Math.min(retrySettings.getMaxRetryDelay().toMillis(), delayMillis); - outstandingBatch.attempt++; - return longRandom.nextLong(0, delayMillis); - } - - private boolean isRetryable(Throwable t) { - Status status = Status.fromThrowable(t); - switch (status.getCode()) { - case ABORTED: - case CANCELLED: - case DEADLINE_EXCEEDED: - case INTERNAL: - case RESOURCE_EXHAUSTED: - case UNKNOWN: - case UNAVAILABLE: - return true; - default: - return false; - } - } - - interface LongRandom { - long nextLong(long least, long bound); - } - /** * Constructs a new {@link Builder} using the given topic. * @@ -542,13 +467,6 @@ public static final class Builder { .setRpcTimeoutMultiplier(2) .setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT) .build(); - static final LongRandom DEFAULT_LONG_RANDOM = - new LongRandom() { - @Override - public long nextLong(long least, long bound) { - return ThreadLocalRandom.current().nextLong(least, bound); - } - }; private static final int THREADS_PER_CPU = 5; static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER = @@ -562,7 +480,6 @@ public long nextLong(long least, long bound) { BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS; RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS; - LongRandom longRandom = DEFAULT_LONG_RANDOM; TransportChannelProvider channelProvider = TopicAdminSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build(); @@ -650,12 +567,6 @@ public Builder setRetrySettings(RetrySettings retrySettings) { return this; } - @InternalApi - Builder setLongRandom(LongRandom longRandom) { - this.longRandom = Preconditions.checkNotNull(longRandom); - return this; - } - /** Gives the ability to set a custom executor to be used by the library. */ public Builder setExecutorProvider(ExecutorProvider executorProvider) { this.executorProvider = Preconditions.checkNotNull(executorProvider); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/TopicAdminSettings.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/TopicAdminSettings.java index 37449df10e56..45fb021a1964 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/TopicAdminSettings.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/TopicAdminSettings.java @@ -24,7 +24,6 @@ import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.rpc.ApiClientHeaderProvider; -import com.google.api.gax.rpc.BatchingCallSettings; import com.google.api.gax.rpc.ClientContext; import com.google.api.gax.rpc.ClientSettings; import com.google.api.gax.rpc.PagedCallSettings; @@ -91,7 +90,7 @@ public UnaryCallSettings createTopicSettings() { } /** Returns the object with the settings used for calls to publish. */ - /* package-private */ BatchingCallSettings publishSettings() { + /* package-private */ UnaryCallSettings publishSettings() { return ((PublisherStubSettings) getStubSettings()).publishSettings(); } @@ -242,7 +241,7 @@ public UnaryCallSettings.Builder createTopicSettings() { } /** Returns the builder for the settings used for calls to publish. */ - /* package-private */ BatchingCallSettings.Builder + /* package-private */ UnaryCallSettings.Builder publishSettings() { return getStubSettingsBuilder().publishSettings(); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/GrpcPublisherStub.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/GrpcPublisherStub.java index 2e9425408edf..392ae08e2a3d 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/GrpcPublisherStub.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/GrpcPublisherStub.java @@ -222,7 +222,7 @@ protected GrpcPublisherStub(PublisherStubSettings settings, ClientContext client GrpcCallableFactory.createUnaryCallable( updateTopicTransportSettings, settings.updateTopicSettings(), clientContext); this.publishCallable = - GrpcCallableFactory.createBatchingCallable( + GrpcCallableFactory.createUnaryCallable( publishTransportSettings, settings.publishSettings(), clientContext); this.getTopicCallable = GrpcCallableFactory.createUnaryCallable( diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/PublisherStubSettings.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/PublisherStubSettings.java index d4d36ebe3a3e..2811c127dfd6 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/PublisherStubSettings.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/PublisherStubSettings.java @@ -21,9 +21,6 @@ import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; import com.google.api.core.BetaApi; -import com.google.api.gax.batching.BatchingSettings; -import com.google.api.gax.batching.FlowControlSettings; -import com.google.api.gax.batching.FlowController.LimitExceededBehavior; import com.google.api.gax.batching.PartitionKey; import com.google.api.gax.batching.RequestBuilder; import com.google.api.gax.core.GaxProperties; @@ -36,7 +33,6 @@ import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ApiClientHeaderProvider; import com.google.api.gax.rpc.BatchedRequestIssuer; -import com.google.api.gax.rpc.BatchingCallSettings; import com.google.api.gax.rpc.BatchingDescriptor; import com.google.api.gax.rpc.ClientContext; import com.google.api.gax.rpc.PageContext; @@ -113,7 +109,7 @@ public class PublisherStubSettings extends StubSettings { private final UnaryCallSettings createTopicSettings; private final UnaryCallSettings updateTopicSettings; - private final BatchingCallSettings publishSettings; + private final UnaryCallSettings publishSettings; private final UnaryCallSettings getTopicSettings; private final PagedCallSettings listTopicsSettings; @@ -138,7 +134,7 @@ public UnaryCallSettings updateTopicSettings() { } /** Returns the object with the settings used for calls to publish. */ - public BatchingCallSettings publishSettings() { + public UnaryCallSettings publishSettings() { return publishSettings; } @@ -446,7 +442,7 @@ public static class Builder extends StubSettings.Builder createTopicSettings; private final UnaryCallSettings.Builder updateTopicSettings; - private final BatchingCallSettings.Builder publishSettings; + private final UnaryCallSettings.Builder publishSettings; private final UnaryCallSettings.Builder getTopicSettings; private final PagedCallSettings.Builder< ListTopicsRequest, ListTopicsResponse, ListTopicsPagedResponse> @@ -528,9 +524,10 @@ protected Builder(ClientContext clientContext) { updateTopicSettings = UnaryCallSettings.newUnaryCallSettingsBuilder(); - publishSettings = - BatchingCallSettings.newBuilder(PUBLISH_BATCHING_DESC) - .setBatchingSettings(BatchingSettings.newBuilder().build()); + publishSettings = UnaryCallSettings.newUnaryCallSettingsBuilder(); + // publishSettings = + // BatchingCallSettings.newBuilder(PUBLISH_BATCHING_DESC) + // .setBatchingSettings(BatchingSettings.newBuilder().build()); getTopicSettings = UnaryCallSettings.newUnaryCallSettingsBuilder(); @@ -586,16 +583,21 @@ private static Builder initDefaults(Builder builder) { builder .publishSettings() - .setBatchingSettings( - BatchingSettings.newBuilder() - .setElementCountThreshold(10L) - .setRequestByteThreshold(1024L) - .setDelayThreshold(Duration.ofMillis(10)) - .setFlowControlSettings( - FlowControlSettings.newBuilder() - .setLimitExceededBehavior(LimitExceededBehavior.Ignore) - .build()) - .build()); + .setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("non_idempotent")) + .setRetrySettings(RETRY_PARAM_DEFINITIONS.get("default")); + + // builder + // .publishSettings() + // .setBatchingSettings( + // BatchingSettings.newBuilder() + // .setElementCountThreshold(10L) + // .setRequestByteThreshold(1024L) + // .setDelayThreshold(Duration.ofMillis(10)) + // .setFlowControlSettings( + // FlowControlSettings.newBuilder() + // .setLimitExceededBehavior(LimitExceededBehavior.Ignore) + // .build()) + // .build()); builder .publishSettings() .setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("one_plus_delivery")) @@ -693,7 +695,7 @@ public UnaryCallSettings.Builder updateTopicSettings( } /** Returns the builder for the settings used for calls to publish. */ - public BatchingCallSettings.Builder publishSettings() { + public UnaryCallSettings.Builder publishSettings() { return publishSettings; } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeScheduledExecutorService.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeScheduledExecutorService.java index 59879712ab53..13704b27d1b0 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeScheduledExecutorService.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeScheduledExecutorService.java @@ -93,11 +93,11 @@ public void setupScheduleExpectation(Duration delay) { expectedWorkQueue.add(delay); } } - + /** - * Blocks the current thread until all the work - * {@link FakeScheduledExecutorService#setupScheduleExpectation(Duration) expected} has been - * scheduled in the executor. + * Blocks the current thread until all the work {@link + * FakeScheduledExecutorService#setupScheduleExpectation(Duration) expected} has been scheduled in + * the executor. */ public void waitForExpectedWork() { synchronized (expectedWorkQueue) { @@ -224,7 +224,7 @@ ScheduledFuture schedulePendingCallable(PendingCallable callable) { } expectedWorkQueue.notifyAll(); } - + return callable.getScheduledFuture(); } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 1495c5f32071..74c6580b1ed7 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -16,14 +16,19 @@ package com.google.cloud.pubsub.v1; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import com.google.api.core.ApiFuture; import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.FixedExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.api.gax.core.NoCredentialsProvider; -import com.google.api.gax.grpc.GrpcTransportChannel; -import com.google.api.gax.rpc.TransportChannel; +import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.pubsub.v1.Publisher.Builder; import com.google.protobuf.ByteString; @@ -33,8 +38,8 @@ import io.grpc.Server; import io.grpc.Status; import io.grpc.StatusException; -import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; +import java.util.concurrent.ExecutionException; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -42,17 +47,6 @@ import org.junit.runners.JUnit4; import org.threeten.bp.Duration; -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledExecutorService; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - @RunWith(JUnit4.class) public class PublisherImplTest { @@ -62,53 +56,7 @@ public class PublisherImplTest { InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build(); private static final TransportChannelProvider TEST_CHANNEL_PROVIDER = - new TransportChannelProvider() { - @Override - public boolean shouldAutoClose() { - return true; - } - - @Override - public boolean needsExecutor() { - return false; - } - - @Override - public TransportChannelProvider withExecutor(ScheduledExecutorService executor) { - return null; - } - - @Override - public boolean needsHeaders() { - return false; - } - - @Override - public TransportChannelProvider withHeaders(Map headers) { - return null; - } - - @Override - public boolean needsEndpoint() { - return false; - } - - @Override - public TransportChannelProvider withEndpoint(String endpoint) { - return null; - } - - @Override - public TransportChannel getTransportChannel() throws IOException { - return GrpcTransportChannel.create(InProcessChannelBuilder.forName("test-server").build()); - } - - @Override - public String getTransportName() { - return null; - } - - }; + LocalChannelProvider.create("test-server"); private FakeScheduledExecutorService fakeExecutor; @@ -384,41 +332,6 @@ public void testPublishFailureRetries_maxRetriesSetUnlimited() throws Exception publisher.shutdown(); } - public void testPublishFailureRetries_exceededsRetryDuration() throws Exception { - Publisher publisher = - getTestPublisherBuilder() - .setExecutorProvider(SINGLE_THREAD_EXECUTOR) - .setRetrySettings( - Publisher.Builder.DEFAULT_RETRY_SETTINGS - .toBuilder() - .setTotalTimeout(Duration.ofSeconds(10)) - .build()) - .setBatchingSettings( - Publisher.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(1L) - .setDelayThreshold(Duration.ofSeconds(5)) - .build()) - .build(); // To demonstrate that reaching duration will trigger publish - - // With exponential backoff, starting at 5ms we should have no more than 11 retries - for (int i = 0; i < 11; ++i) { - testPublisherServiceImpl.addPublishError(new FakeException()); - } - ApiFuture publishFuture1 = sendTestMessage(publisher, "A"); - - try { - publishFuture1.get(); - } catch (ExecutionException e) { - if (!(e.getCause() instanceof FakeException)) { - throw new IllegalStateException("unexpected exception", e); - } - } finally { - assertTrue(testPublisherServiceImpl.getCapturedRequests().size() >= 10); - publisher.shutdown(); - } - } - @Test(expected = ExecutionException.class) public void testPublishFailureRetries_nonRetryableFailsImmediately() throws Exception { Publisher publisher = @@ -628,13 +541,6 @@ private Builder getTestPublisherBuilder() { return Publisher.newBuilder(TEST_TOPIC) .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) .setChannelProvider(TEST_CHANNEL_PROVIDER) - .setCredentialsProvider(NoCredentialsProvider.create()) - .setLongRandom( - new Publisher.LongRandom() { - @Override - public long nextLong(long least, long bound) { - return bound - 1; - } - }); + .setCredentialsProvider(NoCredentialsProvider.create()); } } From 65ad4b05a64621532e58120b108db0ec2c8c9ada Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Thu, 8 Mar 2018 11:10:13 +1100 Subject: [PATCH 2/5] use new approach; keep generated code as-is --- .../com/google/cloud/pubsub/v1/Publisher.java | 3 +- .../cloud/pubsub/v1/TopicAdminSettings.java | 5 ++- .../pubsub/v1/stub/GrpcPublisherStub.java | 2 +- .../pubsub/v1/stub/PublisherStubSettings.java | 44 +++++++++---------- 4 files changed, 27 insertions(+), 27 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index b5134a0d7683..2a0e669681a9 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -147,7 +147,8 @@ private Publisher(Builder builder) throws IOException { StatusCode.Code.RESOURCE_EXHAUSTED, StatusCode.Code.UNKNOWN, StatusCode.Code.UNAVAILABLE) - .setRetrySettings(retrySettings); + .setRetrySettings(retrySettings) + .setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build()); this.topicClient = TopicAdminClient.create(topicSettings.build()); shutdown = new AtomicBoolean(false); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/TopicAdminSettings.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/TopicAdminSettings.java index 45fb021a1964..37449df10e56 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/TopicAdminSettings.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/TopicAdminSettings.java @@ -24,6 +24,7 @@ import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.rpc.ApiClientHeaderProvider; +import com.google.api.gax.rpc.BatchingCallSettings; import com.google.api.gax.rpc.ClientContext; import com.google.api.gax.rpc.ClientSettings; import com.google.api.gax.rpc.PagedCallSettings; @@ -90,7 +91,7 @@ public UnaryCallSettings createTopicSettings() { } /** Returns the object with the settings used for calls to publish. */ - /* package-private */ UnaryCallSettings publishSettings() { + /* package-private */ BatchingCallSettings publishSettings() { return ((PublisherStubSettings) getStubSettings()).publishSettings(); } @@ -241,7 +242,7 @@ public UnaryCallSettings.Builder createTopicSettings() { } /** Returns the builder for the settings used for calls to publish. */ - /* package-private */ UnaryCallSettings.Builder + /* package-private */ BatchingCallSettings.Builder publishSettings() { return getStubSettingsBuilder().publishSettings(); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/GrpcPublisherStub.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/GrpcPublisherStub.java index 392ae08e2a3d..2e9425408edf 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/GrpcPublisherStub.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/GrpcPublisherStub.java @@ -222,7 +222,7 @@ protected GrpcPublisherStub(PublisherStubSettings settings, ClientContext client GrpcCallableFactory.createUnaryCallable( updateTopicTransportSettings, settings.updateTopicSettings(), clientContext); this.publishCallable = - GrpcCallableFactory.createUnaryCallable( + GrpcCallableFactory.createBatchingCallable( publishTransportSettings, settings.publishSettings(), clientContext); this.getTopicCallable = GrpcCallableFactory.createUnaryCallable( diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/PublisherStubSettings.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/PublisherStubSettings.java index 2811c127dfd6..d4d36ebe3a3e 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/PublisherStubSettings.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/stub/PublisherStubSettings.java @@ -21,6 +21,9 @@ import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; import com.google.api.core.BetaApi; +import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController.LimitExceededBehavior; import com.google.api.gax.batching.PartitionKey; import com.google.api.gax.batching.RequestBuilder; import com.google.api.gax.core.GaxProperties; @@ -33,6 +36,7 @@ import com.google.api.gax.rpc.ApiCallContext; import com.google.api.gax.rpc.ApiClientHeaderProvider; import com.google.api.gax.rpc.BatchedRequestIssuer; +import com.google.api.gax.rpc.BatchingCallSettings; import com.google.api.gax.rpc.BatchingDescriptor; import com.google.api.gax.rpc.ClientContext; import com.google.api.gax.rpc.PageContext; @@ -109,7 +113,7 @@ public class PublisherStubSettings extends StubSettings { private final UnaryCallSettings createTopicSettings; private final UnaryCallSettings updateTopicSettings; - private final UnaryCallSettings publishSettings; + private final BatchingCallSettings publishSettings; private final UnaryCallSettings getTopicSettings; private final PagedCallSettings listTopicsSettings; @@ -134,7 +138,7 @@ public UnaryCallSettings updateTopicSettings() { } /** Returns the object with the settings used for calls to publish. */ - public UnaryCallSettings publishSettings() { + public BatchingCallSettings publishSettings() { return publishSettings; } @@ -442,7 +446,7 @@ public static class Builder extends StubSettings.Builder createTopicSettings; private final UnaryCallSettings.Builder updateTopicSettings; - private final UnaryCallSettings.Builder publishSettings; + private final BatchingCallSettings.Builder publishSettings; private final UnaryCallSettings.Builder getTopicSettings; private final PagedCallSettings.Builder< ListTopicsRequest, ListTopicsResponse, ListTopicsPagedResponse> @@ -524,10 +528,9 @@ protected Builder(ClientContext clientContext) { updateTopicSettings = UnaryCallSettings.newUnaryCallSettingsBuilder(); - publishSettings = UnaryCallSettings.newUnaryCallSettingsBuilder(); - // publishSettings = - // BatchingCallSettings.newBuilder(PUBLISH_BATCHING_DESC) - // .setBatchingSettings(BatchingSettings.newBuilder().build()); + publishSettings = + BatchingCallSettings.newBuilder(PUBLISH_BATCHING_DESC) + .setBatchingSettings(BatchingSettings.newBuilder().build()); getTopicSettings = UnaryCallSettings.newUnaryCallSettingsBuilder(); @@ -583,21 +586,16 @@ private static Builder initDefaults(Builder builder) { builder .publishSettings() - .setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("non_idempotent")) - .setRetrySettings(RETRY_PARAM_DEFINITIONS.get("default")); - - // builder - // .publishSettings() - // .setBatchingSettings( - // BatchingSettings.newBuilder() - // .setElementCountThreshold(10L) - // .setRequestByteThreshold(1024L) - // .setDelayThreshold(Duration.ofMillis(10)) - // .setFlowControlSettings( - // FlowControlSettings.newBuilder() - // .setLimitExceededBehavior(LimitExceededBehavior.Ignore) - // .build()) - // .build()); + .setBatchingSettings( + BatchingSettings.newBuilder() + .setElementCountThreshold(10L) + .setRequestByteThreshold(1024L) + .setDelayThreshold(Duration.ofMillis(10)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior(LimitExceededBehavior.Ignore) + .build()) + .build()); builder .publishSettings() .setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("one_plus_delivery")) @@ -695,7 +693,7 @@ public UnaryCallSettings.Builder updateTopicSettings( } /** Returns the builder for the settings used for calls to publish. */ - public UnaryCallSettings.Builder publishSettings() { + public BatchingCallSettings.Builder publishSettings() { return publishSettings; } From f7eb4dfb4a0f5375370cfa4c98f71c489caa3e1f Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Mon, 12 Mar 2018 11:45:12 +1100 Subject: [PATCH 3/5] pr comment --- .../com/google/cloud/pubsub/v1/Publisher.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 2a0e669681a9..382b7c7d8f5d 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -36,6 +36,9 @@ import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.pubsub.v1.stub.GrpcPublisherStub; +import com.google.cloud.pubsub.v1.stub.PublisherStub; +import com.google.cloud.pubsub.v1.stub.PublisherStubSettings; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.pubsub.v1.ProjectTopicName; @@ -90,7 +93,7 @@ public class Publisher { private final AtomicBoolean activeAlarm; - private final TopicAdminClient topicClient; + private final PublisherStub publisherStub; private final ScheduledExecutorService executor; private final AtomicBoolean shutdown; @@ -132,12 +135,12 @@ private Publisher(Builder builder) throws IOException { retrySettings = retrySettings.toBuilder().setMaxAttempts(Integer.MAX_VALUE).build(); } - TopicAdminSettings.Builder topicSettings = - TopicAdminSettings.newBuilder() + PublisherStubSettings.Builder stubSettings = + PublisherStubSettings.newBuilder() .setCredentialsProvider(builder.credentialsProvider) .setExecutorProvider(FixedExecutorProvider.create(executor)) .setTransportChannelProvider(builder.channelProvider); - topicSettings + stubSettings .publishSettings() .setRetryableCodes( StatusCode.Code.ABORTED, @@ -149,7 +152,7 @@ private Publisher(Builder builder) throws IOException { StatusCode.Code.UNAVAILABLE) .setRetrySettings(retrySettings) .setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build()); - this.topicClient = TopicAdminClient.create(topicSettings.build()); + this.publisherStub = GrpcPublisherStub.create(stubSettings.build()); shutdown = new AtomicBoolean(false); messagesWaiter = new MessageWaiter(); @@ -307,7 +310,7 @@ private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) { } ApiFutures.addCallback( - topicClient.publishCallable().futureCall(publishRequest.build()), + publisherStub.publishCallable().futureCall(publishRequest.build()), new ApiFutureCallback() { @Override public void onSuccess(PublishResponse result) { @@ -412,7 +415,7 @@ public void shutdown() throws Exception { for (AutoCloseable closeable : closeables) { closeable.close(); } - topicClient.shutdown(); + publisherStub.shutdown(); } private boolean hasBatchingBytes() { From c97c8bdc5db87199365be525291f228ddaa4746f Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Wed, 14 Mar 2018 09:48:51 +1100 Subject: [PATCH 4/5] pr comment --- .../src/main/java/com/google/cloud/pubsub/v1/Publisher.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index da3dcdf0ff5a..56895d66b400 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -346,11 +346,8 @@ public void onSuccess(PublishResponse result) { @Override public void onFailure(Throwable t) { try { - ApiException gaxException = - ApiExceptionFactory.createException( - t, GrpcStatusCode.of(Status.fromThrowable(t).getCode()), false); for (OutstandingPublish outstandingPublish : outstandingBatch.outstandingPublishes) { - outstandingPublish.publishResult.setException(gaxException); + outstandingPublish.publishResult.setException(t); } } finally { messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); From 754074461f1e8688df5efa9e06230a44c209f67e Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Wed, 14 Mar 2018 11:50:59 +1100 Subject: [PATCH 5/5] commit the test too --- .../cloud/pubsub/v1/PublisherImplTest.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 8983c0275651..bea0ec4e9194 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -16,6 +16,7 @@ package com.google.cloud.pubsub.v1; +import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertSame; @@ -29,6 +30,7 @@ import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.api.gax.rpc.DataLossException; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.pubsub.v1.Publisher.Builder; import com.google.protobuf.ByteString; @@ -233,6 +235,27 @@ private ApiFuture sendTestMessage(Publisher publisher, String data) { PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(data)).build()); } + @Test + public void testErrorPropagation() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setDelayThreshold(Duration.ofSeconds(5)) + .build()) + .build(); + testPublisherServiceImpl.addPublishError(Status.DATA_LOSS.asException()); + try { + sendTestMessage(publisher, "A").get(); + fail("should throw exception"); + } catch (ExecutionException e) { + assertThat(e.getCause()).isInstanceOf(DataLossException.class); + } + } + @Test public void testPublishFailureRetries() throws Exception { Publisher publisher =