From b5dd1c187d1103398cfdc38a018dc919a774d96d Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Fri, 2 Feb 2018 14:06:25 -0800 Subject: [PATCH 1/7] Add MoreFutures utility --- .../org/apache/beam/sdk/util/MoreFutures.java | 251 ++++++++++++++++++ .../beam/sdk/util/ThrowingRunnable.java | 24 ++ .../beam/sdk/util/ThrowingSupplier.java | 26 ++ 3 files changed, 301 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/ThrowingRunnable.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/ThrowingSupplier.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java new file mode 100644 index 000000000000..7b49503e0992 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import com.google.auto.value.AutoValue; +import edu.umd.cs.findbugs.annotations.SuppressWarnings; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import javax.annotation.Nullable; + +/** + * Utilities to do future programming with Java 8. + * + *

Standards for these utilities: + * + *

+ */ +public class MoreFutures { + + /** + * Gets the result of the given future. + * + *

This utility is provided so consumers of futures need not even convert to {@link + * CompletableFuture}, an interface that is only suitable for producers of futures. + */ + public static T get(CompletionStage future) + throws InterruptedException, ExecutionException { + return future.toCompletableFuture().get(); + } + + /** + * Gets the result of the given future. + * + *

This utility is provided so consumers of futures need not even convert to {@link + * CompletableFuture}, an interface that is only suitable for producers of futures. + */ + public static T get(CompletionStage future, long duration, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return future.toCompletableFuture().get(duration, unit); + } + + /** + * Indicates whether the future is done. + * + *

This utility is provided so consumers of futures need not even convert to {@link + * CompletableFuture}, an interface that is only suitable for producers of futures. + */ + public static boolean isDone(CompletionStage future) { + return future.toCompletableFuture().isDone(); + } + + /** + * Indicates whether the future is cancelled. + * + *

This utility is provided so consumers of futures need not even convert to {@link + * CompletableFuture}, an interface that is only suitable for producers of futures. + */ + public static boolean isCancelled(CompletionStage future) { + return future.toCompletableFuture().isCancelled(); + } + + /** + * Like {@link CompletableFuture#supplyAsync(Supplier)} but for {@link ThrowingSupplier}. + * + *

If the {@link ThrowingSupplier} throws an exception, the future completes exceptionally. + */ + public static CompletionStage supplyAsync( + ThrowingSupplier supplier, ExecutorService executorService) { + CompletableFuture result = new CompletableFuture<>(); + CompletableFuture.runAsync( + () -> { + try { + result.complete(supplier.get()); + } catch (InterruptedException e) { + result.completeExceptionally(e); + Thread.currentThread().interrupt(); + } catch (Throwable t) { + result.completeExceptionally(t); + } + }, + executorService); + return result; + } + + /** + * Shorthand for {@link #supplyAsync(ThrowingSupplier, ExecutorService)} using {@link + * ForkJoinPool#commonPool()}. + */ + public static CompletionStage supplyAsync(ThrowingSupplier supplier) { + return supplyAsync(supplier, ForkJoinPool.commonPool()); + } + + /** + * Like {@link CompletableFuture#runAsync} but for {@link ThrowingRunnable}. + * + *

If the {@link ThrowingRunnable} throws an exception, the future completes exceptionally. + */ + public static CompletionStage runAsync( + ThrowingRunnable runnable, ExecutorService executorService) { + CompletableFuture result = new CompletableFuture<>(); + CompletableFuture.runAsync( + () -> { + try { + runnable.run(); + result.complete(null); + } catch (InterruptedException e) { + result.completeExceptionally(e); + Thread.currentThread().interrupt(); + } catch (Throwable t) { + result.completeExceptionally(t); + } + }, + executorService); + return result; + } + + /** + * Shorthand for {@link #runAsync(ThrowingRunnable, ExecutorService)} using {@link + * ForkJoinPool#commonPool()}. + */ + public static CompletionStage runAsync(ThrowingRunnable runnable) { + return runAsync(runnable, ForkJoinPool.commonPool()); + } + + /** + * Like {@link CompletableFuture#allOf} but returning the result of constituent futures. + */ + public static CompletionStage> allAsList( + Collection> futures) { + + // CompletableFuture.allOf completes exceptionally if any of the futures do. + // We have to gather the results separately. + CompletionStage blockAndDiscard = + CompletableFuture.allOf(futuresToCompletableFutures(futures)); + + return blockAndDiscard.thenApply( + nothing -> + futures + .stream() + .map(future -> future.toCompletableFuture().join()) + .collect(Collectors.toList())); + } + + /** + * An object that represents either a result or an exceptional termination. + * + *

This is used, for example, in aggregating the results of many future values in {@link + * #allAsList(Collection)}. + */ + @SuppressWarnings(value = "NM_CLASS_NOT_EXCEPTION", + justification = "The class does hold an exception; its name is accurate.") + @AutoValue + public abstract static class ExceptionOrResult { + + /** + * Describes whether the result was an exception. + */ + public enum IsException { + EXCEPTION, + RESULT + } + + public abstract IsException isException(); + + public abstract @Nullable + T getResult(); + + public abstract @Nullable + Throwable getException(); + + public static ExceptionOrResult exception(Throwable throwable) { + return new AutoValue_MoreFutures_ExceptionOrResult(IsException.EXCEPTION, null, throwable); + } + + public static ExceptionOrResult result(T result) { + return new AutoValue_MoreFutures_ExceptionOrResult(IsException.EXCEPTION, result, null); + } + } + + /** + * Like {@link #allAsList} but return a list . + */ + public static CompletionStage>> allAsListWithExceptions( + Collection> futures) { + + // CompletableFuture.allOf completes exceptionally if any of the futures do. + // We have to gather the results separately. + CompletionStage blockAndDiscard = + CompletableFuture.allOf(futuresToCompletableFutures(futures)) + .whenComplete((ignoredValues, arbitraryException) -> { + }); + + return blockAndDiscard.thenApply( + nothing -> + futures + .stream() + .map( + future -> { + // The limited scope of the exceptions wrapped allows CancellationException + // to still be thrown. + try { + return ExceptionOrResult.result(future.toCompletableFuture().join()); + } catch (CompletionException exc) { + return ExceptionOrResult.exception(exc); + } + }) + .collect(Collectors.toList())); + } + + /** + * Helper to convert a list of futures into an array for use in {@link CompletableFuture} vararg + * combinators. + */ + private static CompletableFuture[] futuresToCompletableFutures( + Collection> futures) { + CompletableFuture[] completableFutures = new CompletableFuture[futures.size()]; + int i = 0; + for (CompletionStage future : futures) { + completableFutures[i] = future.toCompletableFuture(); + ++i; + } + return completableFutures; + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ThrowingRunnable.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ThrowingRunnable.java new file mode 100644 index 000000000000..7b65de3dd238 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ThrowingRunnable.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +/** Like {@link Runnable} but allowed to throw any exception. */ +@FunctionalInterface +public interface ThrowingRunnable { + void run() throws Exception; +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ThrowingSupplier.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ThrowingSupplier.java new file mode 100644 index 000000000000..4d8c43521dbc --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ThrowingSupplier.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import java.util.function.Supplier; + +/** Like {@link Supplier} but allowed to throw any exception. */ +@FunctionalInterface +public interface ThrowingSupplier { + T get() throws Exception; +} From e71baf013cd0f3dac0c69093bc8da90be8129257 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 1 Feb 2018 14:39:26 -0800 Subject: [PATCH 2/7] Switch runners/java-fn-execution from Guava futures to Java 8 futures --- .../control/FnApiControlClient.java | 35 ++++++++----------- .../fnexecution/control/SdkHarnessClient.java | 34 +++++++----------- .../FnApiControlClientPoolServiceTest.java | 9 ++--- .../control/FnApiControlClientTest.java | 30 ++++++++-------- .../control/SdkHarnessClientTest.java | 17 ++++----- 5 files changed, 56 insertions(+), 69 deletions(-) diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java index 1019cd6053d9..d747e0f65f0e 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClient.java @@ -17,20 +17,17 @@ */ package org.apache.beam.runners.fnexecution.control; -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.io.Closeable; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.model.fnexecution.v1.BeamFnApi; -import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse; import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +49,8 @@ public class FnApiControlClient implements Closeable { // All writes to this StreamObserver need to be synchronized. private final StreamObserver requestReceiver; private final ResponseStreamObserver responseObserver = new ResponseStreamObserver(); - private final ConcurrentMap> outstandingRequests; + private final ConcurrentMap> + outstandingRequests; private AtomicBoolean isClosed = new AtomicBoolean(false); private FnApiControlClient(StreamObserver requestReceiver) { @@ -72,16 +70,11 @@ public static FnApiControlClient forRequestObserver( return new FnApiControlClient(requestObserver); } - public ListenableFuture handle( + public CompletionStage handle( BeamFnApi.InstructionRequest request) { LOG.debug("Sending InstructionRequest {}", request); - SettableFuture resultFuture = SettableFuture.create(); - SettableFuture previousResponseFuture = - outstandingRequests.put(request.getInstructionId(), resultFuture); - checkArgument( - previousResponseFuture == null, - "Tried to handle multiple instructions with the same ID %s", - request.getInstructionId()); + CompletableFuture resultFuture = new CompletableFuture<>(); + outstandingRequests.put(request.getInstructionId(), resultFuture); requestReceiver.onNext(request); return resultFuture; } @@ -102,7 +95,7 @@ private void closeAndTerminateOutstandingRequests(Throwable cause) { } // Make a copy of the map to make the view of the outstanding requests consistent. - Map> outstandingRequestsCopy = + Map> outstandingRequestsCopy = new ConcurrentHashMap<>(outstandingRequests); outstandingRequests.clear(); @@ -117,9 +110,9 @@ private void closeAndTerminateOutstandingRequests(Throwable cause) { "{} closed, clearing outstanding requests {}", FnApiControlClient.class.getSimpleName(), outstandingRequestsCopy); - for (SettableFuture outstandingRequest : + for (CompletableFuture outstandingRequest : outstandingRequestsCopy.values()) { - outstandingRequest.setException(cause); + outstandingRequest.completeExceptionally(cause); } } @@ -135,13 +128,13 @@ private class ResponseStreamObserver implements StreamObserver completableFuture = + CompletableFuture responseFuture = outstandingRequests.remove(response.getInstructionId()); - if (completableFuture != null) { + if (responseFuture != null) { if (response.getError().isEmpty()) { - completableFuture.set(response); + responseFuture.complete(response); } else { - completableFuture.setException( + responseFuture.completeExceptionally( new RuntimeException(String.format( "Error received from SDK harness for instruction %s: %s", response.getInstructionId(), diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java index 3a81014a58e6..1c274840d647 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java @@ -20,14 +20,11 @@ import com.google.auto.value.AutoValue; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse; @@ -76,20 +73,20 @@ public String getId() { */ public class BundleProcessor { private final String processBundleDescriptorId; - private final Future registrationFuture; + private final CompletionStage registrationFuture; private final RemoteInputDestination> remoteInput; private BundleProcessor( String processBundleDescriptorId, - Future registrationFuture, + CompletionStage registrationFuture, RemoteInputDestination> remoteInput) { this.processBundleDescriptorId = processBundleDescriptorId; this.registrationFuture = registrationFuture; this.remoteInput = remoteInput; } - public Future getRegistrationFuture() { + public CompletionStage getRegistrationFuture() { return registrationFuture; } @@ -103,7 +100,7 @@ public ActiveBundle newBundle( Map> outputReceivers) { String bundleId = idGenerator.getId(); - final ListenableFuture genericResponse = + final CompletionStage genericResponse = fnApiControlClient.handle( BeamFnApi.InstructionRequest.newBuilder() .setInstructionId(bundleId) @@ -118,11 +115,8 @@ public ActiveBundle newBundle( ProcessBundleDescriptor.class.getSimpleName(), processBundleDescriptorId); - ListenableFuture specificResponse = - Futures.transform( - genericResponse, - InstructionResponse::getProcessBundle, - MoreExecutors.directExecutor()); + CompletionStage specificResponse = + genericResponse.thenApply(InstructionResponse::getProcessBundle); Map outputClients = new HashMap<>(); for (Map.Entry> targetReceiver : outputReceivers.entrySet()) { @@ -155,14 +149,14 @@ private InboundDataClient attachReceiver( public abstract static class ActiveBundle { public abstract String getBundleId(); - public abstract Future getBundleResponse(); + public abstract CompletionStage getBundleResponse(); public abstract CloseableFnDataReceiver> getInputReceiver(); public abstract Map getOutputClients(); public static ActiveBundle create( String bundleId, - Future response, + CompletionStage response, CloseableFnDataReceiver> dataReceiver, Map outputClients) { return new AutoValue_SdkHarnessClient_ActiveBundle<>( @@ -228,7 +222,7 @@ public Map register( LOG.debug("Registering {}", processBundleDescriptors.keySet()); // TODO: validate that all the necessary data endpoints are known - ListenableFuture genericResponse = + CompletionStage genericResponse = fnApiControlClient.handle( BeamFnApi.InstructionRequest.newBuilder() .setInstructionId(idGenerator.getId()) @@ -238,11 +232,9 @@ public Map register( .build()) .build()); - ListenableFuture registerResponseFuture = - Futures.transform( - genericResponse, - InstructionResponse::getRegister, - MoreExecutors.directExecutor()); + CompletionStage registerResponseFuture = + genericResponse.thenApply(InstructionResponse::getRegister); + for (Map.Entry>> descriptorInputEntry : processBundleDescriptors.entrySet()) { clientProcessors.put( diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java index 9392ee0ff335..8f4a09f28fbc 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java @@ -23,11 +23,12 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; -import com.google.common.util.concurrent.ListenableFuture; import io.grpc.stub.StreamObserver; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletionStage; import java.util.concurrent.LinkedBlockingQueue; import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.sdk.util.MoreFutures; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -52,14 +53,14 @@ public void testIncomingConnection() throws Exception { // Check that the client is wired up to the request channel String id = "fakeInstruction"; - ListenableFuture responseFuture = + CompletionStage responseFuture = client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build()); verify(requestObserver).onNext(any(BeamFnApi.InstructionRequest.class)); - assertThat(responseFuture.isDone(), is(false)); + assertThat(MoreFutures.isDone(responseFuture), is(false)); // Check that the response channel really came from the client responseObserver.onNext( BeamFnApi.InstructionResponse.newBuilder().setInstructionId(id).build()); - responseFuture.get(); + MoreFutures.get(responseFuture); } } diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.java index 31a9c0a6825c..c33105fa3616 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientTest.java @@ -24,13 +24,13 @@ import static org.mockito.Matchers.any; import static org.mockito.Mockito.verify; -import com.google.common.util.concurrent.ListenableFuture; import io.grpc.stub.StreamObserver; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest; import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse; +import org.apache.beam.sdk.util.MoreFutures; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -67,13 +67,13 @@ public void testRequestSent() { public void testRequestSuccess() throws Exception { String id = "successfulInstruction"; - Future responseFuture = + CompletionStage responseFuture = client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build()); client .asResponseObserver() .onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId(id).build()); - BeamFnApi.InstructionResponse response = responseFuture.get(); + BeamFnApi.InstructionResponse response = MoreFutures.get(responseFuture); assertThat(response.getInstructionId(), equalTo(id)); } @@ -81,7 +81,7 @@ public void testRequestSuccess() throws Exception { @Test public void testRequestError() throws Exception { String id = "instructionId"; - ListenableFuture responseFuture = + CompletionStage responseFuture = client.handle(InstructionRequest.newBuilder().setInstructionId(id).build()); String error = "Oh no an error!"; client @@ -94,7 +94,7 @@ public void testRequestError() throws Exception { thrown.expectCause(isA(RuntimeException.class)); thrown.expectMessage(error); - responseFuture.get(); + MoreFutures.get(responseFuture); } @Test @@ -102,22 +102,22 @@ public void testUnknownResponseIgnored() throws Exception { String id = "actualInstruction"; String unknownId = "unknownInstruction"; - ListenableFuture responseFuture = + CompletionStage responseFuture = client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build()); client .asResponseObserver() .onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId(unknownId).build()); - assertThat(responseFuture.isDone(), is(false)); - assertThat(responseFuture.isCancelled(), is(false)); + assertThat(MoreFutures.isDone(responseFuture), is(false)); + assertThat(MoreFutures.isCancelled(responseFuture), is(false)); } @Test public void testOnCompletedCancelsOutstanding() throws Exception { String id = "clientHangUpInstruction"; - Future responseFuture = + CompletionStage responseFuture = client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build()); client.asResponseObserver().onCompleted(); @@ -125,14 +125,14 @@ public void testOnCompletedCancelsOutstanding() throws Exception { thrown.expect(ExecutionException.class); thrown.expectCause(isA(IllegalStateException.class)); thrown.expectMessage("closed"); - responseFuture.get(); + MoreFutures.get(responseFuture); } @Test public void testOnErrorCancelsOutstanding() throws Exception { String id = "errorInstruction"; - Future responseFuture = + CompletionStage responseFuture = client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build()); class FrazzleException extends Exception {} @@ -140,14 +140,14 @@ class FrazzleException extends Exception {} thrown.expect(ExecutionException.class); thrown.expectCause(isA(FrazzleException.class)); - responseFuture.get(); + MoreFutures.get(responseFuture); } @Test public void testCloseCancelsOutstanding() throws Exception { String id = "serverCloseInstruction"; - Future responseFuture = + CompletionStage responseFuture = client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build()); client.close(); @@ -155,6 +155,6 @@ public void testCloseCancelsOutstanding() throws Exception { thrown.expect(ExecutionException.class); thrown.expectCause(isA(IllegalStateException.class)); thrown.expectMessage("closed"); - responseFuture.get(); + MoreFutures.get(responseFuture); } } diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java index 53aed4164b85..0a18ff6844fe 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java @@ -26,7 +26,6 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.SettableFuture; import io.grpc.ManagedChannel; import io.grpc.inprocess.InProcessChannelBuilder; import java.io.IOException; @@ -34,6 +33,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -76,6 +76,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow.Coder; +import org.apache.beam.sdk.util.MoreFutures; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.values.TupleTag; @@ -107,7 +108,7 @@ public void testRegisterDoesNotCrash() throws Exception { String descriptorId1 = "descriptor1"; String descriptorId2 = "descriptor2"; - SettableFuture registerResponseFuture = SettableFuture.create(); + CompletableFuture registerResponseFuture = new CompletableFuture<>(); when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class))) .thenReturn(registerResponseFuture); @@ -146,10 +147,10 @@ public void testNewBundleNoDataDoesNotCrash() throws Exception { ProcessBundleDescriptor descriptor = ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build(); - SettableFuture processBundleResponseFuture = - SettableFuture.create(); + CompletableFuture processBundleResponseFuture = + new CompletableFuture<>(); when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class))) - .thenReturn(SettableFuture.create()) + .thenReturn(new CompletableFuture<>()) .thenReturn(processBundleResponseFuture); FullWindowedValueCoder coder = @@ -169,9 +170,9 @@ public void testNewBundleNoDataDoesNotCrash() throws Exception { // Currently there are no fields so there's nothing to check. This test is formulated // to match the pattern it should have if/when the response is meaningful. BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.getDefaultInstance(); - processBundleResponseFuture.set( + processBundleResponseFuture.complete( BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build()); - activeBundle.getBundleResponse().get(); + MoreFutures.get(activeBundle.getBundleResponse()); } @Test @@ -248,7 +249,7 @@ public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { bundleInputReceiver.accept(WindowedValue.valueInGlobalWindow("bar")); bundleInputReceiver.accept(WindowedValue.valueInGlobalWindow("baz")); } - activeBundle.getBundleResponse().get(); + MoreFutures.get(activeBundle.getBundleResponse()); for (InboundDataClient outputClient : activeBundle.getOutputClients().values()) { outputClient.awaitCompletion(); } From c757854240f12ff98fea25a2101dd7406508793d Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 1 Feb 2018 15:48:59 -0800 Subject: [PATCH 3/7] Switch DataflowRunner from Guava futures to Java 8 futures --- .../runners/dataflow/util/PackageUtil.java | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java index 9c0ce73ef36e..387b7e3a5900 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/PackageUtil.java @@ -32,9 +32,6 @@ import com.google.common.io.ByteSource; import com.google.common.io.CountingOutputStream; import com.google.common.io.Files; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import java.io.Closeable; import java.io.File; @@ -47,7 +44,9 @@ import java.util.Collection; import java.util.Comparator; import java.util.List; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -61,6 +60,7 @@ import org.apache.beam.sdk.util.BackOffAdapter; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.sdk.util.MimeTypes; +import org.apache.beam.sdk.util.MoreFutures; import org.apache.beam.sdk.util.ZipFiles; import org.joda.time.Duration; import org.slf4j.Logger; @@ -95,9 +95,9 @@ class PackageUtil implements Closeable { */ private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor(); - private final ListeningExecutorService executorService; + private final ExecutorService executorService; - private PackageUtil(ListeningExecutorService executorService) { + private PackageUtil(ExecutorService executorService) { this.executorService = executorService; } @@ -107,7 +107,7 @@ public static PackageUtil withDefaultThreadPool() { MoreExecutors.platformThreadFactory()))); } - public static PackageUtil withExecutorService(ListeningExecutorService executorService) { + public static PackageUtil withExecutorService(ExecutorService executorService) { return new PackageUtil(executorService); } @@ -134,10 +134,10 @@ public int compare(PackageAttributes o1, PackageAttributes o2) { } /** Asynchronously computes {@link PackageAttributes} for a single staged file. */ - private ListenableFuture computePackageAttributes( + private CompletionStage computePackageAttributes( final DataflowPackage source, final String stagingPath) { - return executorService.submit( + return MoreFutures.supplyAsync( () -> { final File file = new File(source.getLocation()); if (!file.exists()) { @@ -150,7 +150,8 @@ private ListenableFuture computePackageAttributes( attributes = attributes.withPackageName(source.getName()); } return attributes; - }); + }, + executorService); } private boolean alreadyStaged(PackageAttributes attributes) throws IOException { @@ -165,12 +166,12 @@ private boolean alreadyStaged(PackageAttributes attributes) throws IOException { } /** Stages one file ("package") if necessary. */ - public ListenableFuture stagePackage( + public CompletionStage stagePackage( final PackageAttributes attributes, final Sleeper retrySleeper, final CreateOptions createOptions) { - return executorService.submit( - () -> stagePackageSynchronously(attributes, retrySleeper, createOptions)); + return MoreFutures.supplyAsync( + () -> stagePackageSynchronously(attributes, retrySleeper, createOptions), executorService); } /** Synchronously stages a package, with retry and backoff for resiliency. */ @@ -286,11 +287,11 @@ List stageClasspathElements( public DataflowPackage stageToFile( byte[] bytes, String target, String stagingPath, CreateOptions createOptions) { try { - return stagePackage( - PackageAttributes.forBytesToStage(bytes, target, stagingPath), - DEFAULT_SLEEPER, - createOptions) - .get() + return MoreFutures.get( + stagePackage( + PackageAttributes.forBytesToStage(bytes, target, stagingPath), + DEFAULT_SLEEPER, + createOptions)) .getPackageAttributes() .getDestination(); } catch (InterruptedException e) { @@ -331,7 +332,7 @@ List stageClasspathElements( final AtomicInteger numUploaded = new AtomicInteger(0); final AtomicInteger numCached = new AtomicInteger(0); - List> destinationPackages = new ArrayList<>(); + List> destinationPackages = new ArrayList<>(); for (String classpathElement : classpathElements) { DataflowPackage sourcePackage = new DataflowPackage(); @@ -350,15 +351,14 @@ List stageClasspathElements( continue; } - // TODO: Java 8 / Guava 23.0: FluentFuture - ListenableFuture stagingResult = - Futures.transformAsync( - computePackageAttributes(sourcePackage, stagingPath), - packageAttributes -> stagePackage(packageAttributes, retrySleeper, createOptions)); + CompletionStage stagingResult = + computePackageAttributes(sourcePackage, stagingPath) + .thenComposeAsync( + packageAttributes -> + stagePackage(packageAttributes, retrySleeper, createOptions)); - ListenableFuture stagedPackage = - Futures.transform( - stagingResult, + CompletionStage stagedPackage = + stagingResult.thenApply( stagingResult1 -> { if (stagingResult1.alreadyStaged()) { numCached.incrementAndGet(); @@ -372,19 +372,19 @@ List stageClasspathElements( } try { - ListenableFuture> stagingFutures = - Futures.allAsList(destinationPackages); + CompletionStage> stagingFutures = + MoreFutures.allAsList(destinationPackages); boolean finished = false; do { try { - stagingFutures.get(3L, TimeUnit.MINUTES); + MoreFutures.get(stagingFutures, 3L, TimeUnit.MINUTES); finished = true; } catch (TimeoutException e) { // finished will still be false LOG.info("Still staging {} files", classpathElements.size()); } } while (!finished); - List stagedPackages = stagingFutures.get(); + List stagedPackages = MoreFutures.get(stagingFutures); LOG.info( "Staging files complete: {} files cached, {} files newly uploaded", numCached.get(), numUploaded.get()); From 6bb65301381833a06c58ffa962db3f95111749a9 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 1 Feb 2018 15:49:16 -0800 Subject: [PATCH 4/7] Switch gcp-core from Guava futures to Java 8 futures --- .../org/apache/beam/sdk/util/GcsUtil.java | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java index 3d035aab40dd..cd35374dba50 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java @@ -44,9 +44,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import java.io.FileNotFoundException; import java.io.IOException; @@ -59,6 +56,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -578,7 +576,7 @@ public boolean shouldRetry(IOException e) { } private static void executeBatches(List batches) throws IOException { - ListeningExecutorService executor = + ExecutorService executor = MoreExecutors.listeningDecorator( MoreExecutors.getExitingExecutorService( new ThreadPoolExecutor( @@ -588,18 +586,17 @@ private static void executeBatches(List batches) throws IOExceptio TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()))); - List> futures = new LinkedList<>(); + List> futures = new LinkedList<>(); for (final BatchRequest batch : batches) { - futures.add( - executor.submit( - () -> { - batch.execute(); - return null; - })); + futures.add(MoreFutures.runAsync( + () -> { + batch.execute(); + }, + executor)); } try { - Futures.allAsList(futures).get(); + MoreFutures.get(MoreFutures.allAsList(futures)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Interrupted while executing batch GCS request", e); From f64a63300aaeead485544c53e9b68109e469a1d6 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Fri, 2 Feb 2018 14:07:00 -0800 Subject: [PATCH 5/7] Switch runners/core-construction-java from Guava futures to Java 8 futures --- .../construction/ArtifactServiceStager.java | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java index ae40fabb9e2c..7319b8f30d1f 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ArtifactServiceStager.java @@ -20,8 +20,6 @@ import com.google.auto.value.AutoValue; import com.google.common.io.BaseEncoding; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; @@ -40,6 +38,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CompletionStage; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -54,6 +53,8 @@ import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc; import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceBlockingStub; import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceStub; +import org.apache.beam.sdk.util.MoreFutures; +import org.apache.beam.sdk.util.ThrowingSupplier; /** A client to stage files on an {@link ArtifactStagingServiceGrpc ArtifactService}. */ public class ArtifactServiceStager { @@ -87,19 +88,20 @@ private ArtifactServiceStager(Channel channel, int bufferSize) { } public void stage(Iterable files) throws IOException, InterruptedException { - final Map> futures = new HashMap<>(); + final Map> futures = new HashMap<>(); for (File file : files) { - futures.put(file, executorService.submit(new StagingCallable(file))); + futures.put(file, MoreFutures.supplyAsync(new StagingCallable(file), executorService)); } - ListenableFuture stagingResult = - Futures.whenAllComplete(futures.values()).call(new ExtractStagingResultsCallable(futures)); + CompletionStage stagingResult = + MoreFutures.allAsList(futures.values()) + .thenApply(ignored -> new ExtractStagingResultsCallable(futures).call()); stageManifest(stagingResult); } - private void stageManifest(ListenableFuture stagingFuture) + private void stageManifest(CompletionStage stagingFuture) throws InterruptedException { try { - StagingResult stagingResult = stagingFuture.get(); + StagingResult stagingResult = MoreFutures.get(stagingFuture); if (stagingResult.isSuccess()) { Manifest manifest = Manifest.newBuilder().addAllArtifact(stagingResult.getMetadata()).build(); @@ -121,7 +123,7 @@ private void stageManifest(ListenableFuture stagingFuture) } } - private class StagingCallable implements Callable { + private class StagingCallable implements ThrowingSupplier { private final File file; private StagingCallable(File file) { @@ -129,7 +131,7 @@ private StagingCallable(File file) { } @Override - public ArtifactMetadata call() throws Exception { + public ArtifactMetadata get() throws Exception { // TODO: Add Retries PutArtifactResponseObserver responseObserver = new PutArtifactResponseObserver(); StreamObserver requestObserver = stub.putArtifact(responseObserver); @@ -191,20 +193,20 @@ public void awaitTermination() throws InterruptedException { } private static class ExtractStagingResultsCallable implements Callable { - private final Map> futures; + private final Map> futures; private ExtractStagingResultsCallable( - Map> futures) { + Map> futures) { this.futures = futures; } @Override - public StagingResult call() throws Exception { + public StagingResult call() { Set metadata = new HashSet<>(); Map failures = new HashMap<>(); - for (Entry> stagedFileResult : futures.entrySet()) { + for (Entry> stagedFileResult : futures.entrySet()) { try { - metadata.add(stagedFileResult.getValue().get()); + metadata.add(MoreFutures.get(stagedFileResult.getValue())); } catch (ExecutionException ee) { failures.put(stagedFileResult.getKey(), ee.getCause()); } catch (InterruptedException ie) { From 0f19c32e2083c5003752fefb28fc2f8a9e3c38ed Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Sun, 4 Feb 2018 11:40:16 -0800 Subject: [PATCH 6/7] Switch AWS IO from Guava futures to Java 8 futures --- .../org/apache/beam/sdk/io/aws/s3/S3FileSystem.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java index 5adf42a11749..ac95a638af7f 100644 --- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java @@ -47,8 +47,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -64,6 +62,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -73,6 +72,7 @@ import org.apache.beam.sdk.io.aws.options.S3Options; import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.util.MoreFutures; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -637,11 +637,11 @@ protected S3ResourceId matchNewResource(String singleResourceSpec, boolean isDir private List callTasks(Collection> tasks) throws IOException { try { - List> futures = new ArrayList<>(tasks.size()); + List> futures = new ArrayList<>(tasks.size()); for (Callable task : tasks) { - futures.add(executorService.submit(task)); + futures.add(MoreFutures.supplyAsync(() -> task.call(), executorService)); } - return Futures.allAsList(futures).get(); + return MoreFutures.get(MoreFutures.allAsList(futures)); } catch (ExecutionException e) { if (e.getCause() != null) { From b1fd123c078bcb4c15e690adcaf02b651490c5d4 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Sun, 4 Feb 2018 15:04:38 -0800 Subject: [PATCH 7/7] Switch BigTableIO from Guava futures to Java 8 futures --- .../beam/sdk/io/gcp/bigtable/BigtableIO.java | 32 +++++-------------- .../sdk/io/gcp/bigtable/BigtableService.java | 4 +-- .../io/gcp/bigtable/BigtableServiceImpl.java | 24 ++++++++++++-- .../sdk/io/gcp/bigtable/BigtableIOTest.java | 14 ++++---- .../gcp/bigtable/BigtableServiceImplTest.java | 9 +++++- 5 files changed, 47 insertions(+), 36 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java index 63138bb18169..4e602699e580 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java @@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; -import com.google.bigtable.v2.MutateRowResponse; import com.google.bigtable.v2.Mutation; import com.google.bigtable.v2.Row; import com.google.bigtable.v2.RowFilter; @@ -33,9 +32,6 @@ import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import java.io.IOException; import java.util.Arrays; @@ -705,10 +701,14 @@ public void startBundle(StartBundleContext c) throws IOException { @ProcessElement public void processElement(ProcessContext c) throws Exception { checkForFailures(); - Futures.addCallback( - bigtableWriter.writeRecord(c.element()), - new WriteExceptionCallback(c.element()), - MoreExecutors.directExecutor()); + bigtableWriter + .writeRecord(c.element()) + .whenComplete( + (mutationResult, exception) -> { + if (exception != null) { + failures.add(new BigtableWriteException(c.element(), exception)); + } + }); ++recordsWritten; } @@ -772,22 +772,6 @@ private void checkForFailures() throws IOException { } throw exception; } - - private class WriteExceptionCallback implements FutureCallback { - private final KV> value; - - public WriteExceptionCallback(KV> value) { - this.value = value; - } - - @Override - public void onFailure(Throwable cause) { - failures.add(new BigtableWriteException(value, cause)); - } - - @Override - public void onSuccess(MutateRowResponse produced) {} - } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java index ecb7b32b0300..1c9fffff5a78 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableService.java @@ -22,12 +22,12 @@ import com.google.bigtable.v2.Row; import com.google.bigtable.v2.SampleRowKeysResponse; import com.google.cloud.bigtable.config.BigtableOptions; -import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; import java.io.IOException; import java.io.Serializable; import java.util.List; import java.util.NoSuchElementException; +import java.util.concurrent.CompletionStage; import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource; import org.apache.beam.sdk.values.KV; @@ -47,7 +47,7 @@ interface Writer { * * @throws IOException if there is an error submitting the write. */ - ListenableFuture writeRecord(KV> record) + CompletionStage writeRecord(KV> record) throws IOException; /** diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java index fe25e20b5d0a..b9492b3ab4e3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImpl.java @@ -35,13 +35,16 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.io.Closer; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.protobuf.ByteString; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import java.io.IOException; import java.util.List; import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource; import org.apache.beam.sdk.io.range.ByteKeyRange; import org.apache.beam.sdk.values.KV; @@ -216,14 +219,29 @@ public void close() throws IOException { } @Override - public ListenableFuture writeRecord( + public CompletionStage writeRecord( KV> record) throws IOException { MutateRowsRequest.Entry request = MutateRowsRequest.Entry.newBuilder() .setRowKey(record.getKey()) .addAllMutations(record.getValue()) .build(); - return bulkMutation.add(request); + + CompletableFuture result = new CompletableFuture<>(); + Futures.addCallback( + bulkMutation.add(request), + new FutureCallback() { + @Override + public void onSuccess(MutateRowResponse mutateRowResponse) { + result.complete(mutateRowResponse); + } + + @Override + public void onFailure(Throwable throwable) { + result.completeExceptionally(throwable); + } + }); + return result; } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java index e1fab40b4422..9f98371bdf83 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java @@ -56,8 +56,6 @@ import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; import java.io.IOException; import java.io.Serializable; @@ -72,6 +70,8 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; @@ -1180,7 +1180,7 @@ public void close() { * entries. The column family in the {@link SetCell} is ignored; only the value is used. * *

When no {@link SetCell} is provided, the write will fail and this will be exposed via an - * exception on the returned {@link ListenableFuture}. + * exception on the returned {@link CompletionStage}. */ private static class FakeBigtableWriter implements BigtableService.Writer { private final String tableId; @@ -1190,7 +1190,7 @@ public FakeBigtableWriter(String tableId) { } @Override - public ListenableFuture writeRecord( + public CompletionStage writeRecord( KV> record) { service.verifyTableExists(tableId); Map table = service.getTable(tableId); @@ -1198,11 +1198,13 @@ public ListenableFuture writeRecord( for (Mutation m : record.getValue()) { SetCell cell = m.getSetCell(); if (cell.getValue().isEmpty()) { - return Futures.immediateFailedCheckedFuture(new IOException("cell value missing")); + CompletableFuture result = new CompletableFuture<>(); + result.completeExceptionally(new IOException("cell value missing")); + return result; } table.put(key, cell.getValue()); } - return Futures.immediateFuture(MutateRowResponse.getDefaultInstance()); + return CompletableFuture.completedFuture(MutateRowResponse.getDefaultInstance()); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java index 52288974cab6..fb56ee49d176 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceImplTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.bigtable.v2.MutateRowResponse; import com.google.bigtable.v2.MutateRowsRequest; import com.google.bigtable.v2.MutateRowsRequest.Entry; import com.google.bigtable.v2.Mutation; @@ -36,6 +37,8 @@ import com.google.cloud.bigtable.grpc.BigtableTableName; import com.google.cloud.bigtable.grpc.async.BulkMutation; import com.google.cloud.bigtable.grpc.scanner.ResultScanner; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.ByteString; import java.io.IOException; import java.util.Arrays; @@ -132,7 +135,11 @@ public void testWrite() throws IOException, InterruptedException { Mutation mutation = Mutation.newBuilder() .setSetCell(SetCell.newBuilder().setFamilyName("Family").build()).build(); ByteString key = ByteString.copyFromUtf8("key"); - underTest.writeRecord(KV.of(key, (Iterable) Arrays.asList(mutation))); + + SettableFuture fakeResponse = SettableFuture.create(); + when(mockBulkMutation.add(any(MutateRowsRequest.Entry.class))).thenReturn(fakeResponse); + + underTest.writeRecord(KV.of(key, ImmutableList.of(mutation))); Entry expected = MutateRowsRequest.Entry.newBuilder() .setRowKey(key) .addMutations(mutation)