diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index c1f457a81fe0..51d1cf5d4ed7 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -99,7 +99,7 @@ public class Publisher { private final PublisherStub publisherStub; private final ScheduledExecutorService executor; - private final SequentialExecutorService sequentialExecutor; + private final SequentialExecutorService sequentialExecutor; private final AtomicBoolean shutdown; private final List closeables; private final MessageWaiter messagesWaiter; @@ -127,7 +127,7 @@ private Publisher(Builder builder) throws IOException { messagesBatchLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); executor = builder.executorProvider.getExecutor(); - sequentialExecutor = new SequentialExecutorService<>(executor); + sequentialExecutor = new SequentialExecutorService(executor); if (builder.executorProvider.shouldAutoClose()) { closeables = Collections.singletonList(new ExecutorAsBackgroundResource(executor)); @@ -398,9 +398,9 @@ public void run() { executor.execute(task); } else { // If ordering key is specified, publish the batch using the sequential executor. - Callable func = - new Callable() { - public ApiFuture call() { + Callable> func = + new Callable>() { + public ApiFuture call() { return publishCall(outstandingBatch); } }; diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index bbebd8db4de2..bbc3b2e74d67 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -41,7 +41,7 @@ interface CancellableRunnable extends Runnable { * key will be run only when its predecessor has been completed while tasks with different keys can * be run in parallel. */ -final class SequentialExecutorService { +final class SequentialExecutorService { private static final Logger logger = Logger.getLogger(SequentialExecutorService.class.getName()); private final CallbackExecutor callbackExecutor; @@ -56,7 +56,7 @@ final class SequentialExecutorService { * Runs asynchronous {@code Callable} tasks sequentially. If one of the tasks fails, other tasks * with the same key that have not been executed will be cancelled. */ - ApiFuture submit(final String key, final Callable callable) { + ApiFuture submit(final String key, final Callable> callable) { return callbackExecutor.submit(key, callable); } @@ -146,7 +146,7 @@ private static class CallbackExecutor extends SequentialExecutor { super(executor); } - ApiFuture submit(final String key, final Callable callable) { + ApiFuture submit(final String key, final Callable> callable) { final SettableApiFuture future = SettableApiFuture.create(); execute( key, diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java index b592194b410a..00ea85533e06 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SequentialExecutorServiceTest.java @@ -46,9 +46,9 @@ public final class SequentialExecutorServiceTest { .setExecutorThreadCount(5 * Runtime.getRuntime().availableProcessors()) .build(); - static class AsyncTaskCallable implements Callable { + static class AsyncTaskCallable implements Callable> { boolean isCalled = false; - SettableApiFuture result = SettableApiFuture.create(); + SettableApiFuture result = SettableApiFuture.create(); @Override public ApiFuture call() { @@ -71,8 +71,8 @@ public void finish() { @Test public void testExecutorRunsNextTaskWhenPrevResponseReceived() throws Exception { - SequentialExecutorService sequentialExecutorService = - new SequentialExecutorService<>(executorProvider.getExecutor()); + SequentialExecutorService sequentialExecutorService = + new SequentialExecutorService(executorProvider.getExecutor()); AsyncTaskCallable callable1 = new AsyncTaskCallable(); AsyncTaskCallable callable2 = new AsyncTaskCallable(); AsyncTaskCallable callable3 = new AsyncTaskCallable(); @@ -97,8 +97,8 @@ public void testExecutorRunsNextTaskWhenPrevResponseReceived() throws Exception @Test public void testExecutorRunsDifferentKeySimultaneously() throws Exception { - SequentialExecutorService sequentialExecutorService = - new SequentialExecutorService<>(executorProvider.getExecutor()); + SequentialExecutorService sequentialExecutorService = + new SequentialExecutorService(executorProvider.getExecutor()); AsyncTaskCallable callable1 = new AsyncTaskCallable(); AsyncTaskCallable callable2 = new AsyncTaskCallable(); AsyncTaskCallable callable3 = new AsyncTaskCallable(); @@ -126,8 +126,8 @@ public void testExecutorRunsDifferentKeySimultaneously() throws Exception { @Test public void testExecutorCancelsAllTasksWhenOneFailed() throws Exception { - SequentialExecutorService sequentialExecutorService = - new SequentialExecutorService<>(executorProvider.getExecutor()); + SequentialExecutorService sequentialExecutorService = + new SequentialExecutorService(executorProvider.getExecutor()); AsyncTaskCallable callable1 = new AsyncTaskCallable(); AsyncTaskCallable callable2 = new AsyncTaskCallable(); AsyncTaskCallable callable3 = new AsyncTaskCallable();