Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import com.google.auto.value.AutoValue;
import com.google.common.io.BaseEncoding;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
Expand All @@ -40,6 +38,7 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
Expand All @@ -54,6 +53,8 @@
import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc;
import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceBlockingStub;
import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceStub;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.sdk.util.ThrowingSupplier;

/** A client to stage files on an {@link ArtifactStagingServiceGrpc ArtifactService}. */
public class ArtifactServiceStager {
Expand Down Expand Up @@ -87,19 +88,20 @@ private ArtifactServiceStager(Channel channel, int bufferSize) {
}

public void stage(Iterable<File> files) throws IOException, InterruptedException {
final Map<File, ListenableFuture<ArtifactMetadata>> futures = new HashMap<>();
final Map<File, CompletionStage<ArtifactMetadata>> futures = new HashMap<>();
for (File file : files) {
futures.put(file, executorService.submit(new StagingCallable(file)));
futures.put(file, MoreFutures.supplyAsync(new StagingCallable(file), executorService));
}
ListenableFuture<StagingResult> stagingResult =
Futures.whenAllComplete(futures.values()).call(new ExtractStagingResultsCallable(futures));
CompletionStage<StagingResult> stagingResult =
MoreFutures.allAsList(futures.values())
.thenApply(ignored -> new ExtractStagingResultsCallable(futures).call());
stageManifest(stagingResult);
}

private void stageManifest(ListenableFuture<StagingResult> stagingFuture)
private void stageManifest(CompletionStage<StagingResult> stagingFuture)
throws InterruptedException {
try {
StagingResult stagingResult = stagingFuture.get();
StagingResult stagingResult = MoreFutures.get(stagingFuture);
if (stagingResult.isSuccess()) {
Manifest manifest =
Manifest.newBuilder().addAllArtifact(stagingResult.getMetadata()).build();
Expand All @@ -121,15 +123,15 @@ private void stageManifest(ListenableFuture<StagingResult> stagingFuture)
}
}

private class StagingCallable implements Callable<ArtifactMetadata> {
private class StagingCallable implements ThrowingSupplier<ArtifactMetadata> {
private final File file;

private StagingCallable(File file) {
this.file = file;
}

@Override
public ArtifactMetadata call() throws Exception {
public ArtifactMetadata get() throws Exception {
// TODO: Add Retries
PutArtifactResponseObserver responseObserver = new PutArtifactResponseObserver();
StreamObserver<PutArtifactRequest> requestObserver = stub.putArtifact(responseObserver);
Expand Down Expand Up @@ -191,20 +193,20 @@ public void awaitTermination() throws InterruptedException {
}

private static class ExtractStagingResultsCallable implements Callable<StagingResult> {
private final Map<File, ListenableFuture<ArtifactMetadata>> futures;
private final Map<File, CompletionStage<ArtifactMetadata>> futures;

private ExtractStagingResultsCallable(
Map<File, ListenableFuture<ArtifactMetadata>> futures) {
Map<File, CompletionStage<ArtifactMetadata>> futures) {
this.futures = futures;
}

@Override
public StagingResult call() throws Exception {
public StagingResult call() {
Set<ArtifactMetadata> metadata = new HashSet<>();
Map<File, Throwable> failures = new HashMap<>();
for (Entry<File, ListenableFuture<ArtifactMetadata>> stagedFileResult : futures.entrySet()) {
for (Entry<File, CompletionStage<ArtifactMetadata>> stagedFileResult : futures.entrySet()) {
try {
metadata.add(stagedFileResult.getValue().get());
metadata.add(MoreFutures.get(stagedFileResult.getValue()));
} catch (ExecutionException ee) {
failures.put(stagedFileResult.getKey(), ee.getCause());
} catch (InterruptedException ie) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@
import com.google.common.io.ByteSource;
import com.google.common.io.CountingOutputStream;
import com.google.common.io.Files;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.Closeable;
import java.io.File;
Expand All @@ -47,7 +44,9 @@
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -61,6 +60,7 @@
import org.apache.beam.sdk.util.BackOffAdapter;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.util.MoreFutures;
import org.apache.beam.sdk.util.ZipFiles;
import org.joda.time.Duration;
import org.slf4j.Logger;
Expand Down Expand Up @@ -95,9 +95,9 @@ class PackageUtil implements Closeable {
*/
private static final ApiErrorExtractor ERROR_EXTRACTOR = new ApiErrorExtractor();

private final ListeningExecutorService executorService;
private final ExecutorService executorService;

private PackageUtil(ListeningExecutorService executorService) {
private PackageUtil(ExecutorService executorService) {
this.executorService = executorService;
}

Expand All @@ -107,7 +107,7 @@ public static PackageUtil withDefaultThreadPool() {
MoreExecutors.platformThreadFactory())));
}

public static PackageUtil withExecutorService(ListeningExecutorService executorService) {
public static PackageUtil withExecutorService(ExecutorService executorService) {
return new PackageUtil(executorService);
}

Expand All @@ -134,10 +134,10 @@ public int compare(PackageAttributes o1, PackageAttributes o2) {
}

/** Asynchronously computes {@link PackageAttributes} for a single staged file. */
private ListenableFuture<PackageAttributes> computePackageAttributes(
private CompletionStage<PackageAttributes> computePackageAttributes(
final DataflowPackage source, final String stagingPath) {

return executorService.submit(
return MoreFutures.supplyAsync(
() -> {
final File file = new File(source.getLocation());
if (!file.exists()) {
Expand All @@ -150,7 +150,8 @@ private ListenableFuture<PackageAttributes> computePackageAttributes(
attributes = attributes.withPackageName(source.getName());
}
return attributes;
});
},
executorService);
}

private boolean alreadyStaged(PackageAttributes attributes) throws IOException {
Expand All @@ -165,12 +166,12 @@ private boolean alreadyStaged(PackageAttributes attributes) throws IOException {
}

/** Stages one file ("package") if necessary. */
public ListenableFuture<StagingResult> stagePackage(
public CompletionStage<StagingResult> stagePackage(
final PackageAttributes attributes,
final Sleeper retrySleeper,
final CreateOptions createOptions) {
return executorService.submit(
() -> stagePackageSynchronously(attributes, retrySleeper, createOptions));
return MoreFutures.supplyAsync(
() -> stagePackageSynchronously(attributes, retrySleeper, createOptions), executorService);
}

/** Synchronously stages a package, with retry and backoff for resiliency. */
Expand Down Expand Up @@ -286,11 +287,11 @@ List<DataflowPackage> stageClasspathElements(
public DataflowPackage stageToFile(
byte[] bytes, String target, String stagingPath, CreateOptions createOptions) {
try {
return stagePackage(
PackageAttributes.forBytesToStage(bytes, target, stagingPath),
DEFAULT_SLEEPER,
createOptions)
.get()
return MoreFutures.get(
stagePackage(
PackageAttributes.forBytesToStage(bytes, target, stagingPath),
DEFAULT_SLEEPER,
createOptions))
.getPackageAttributes()
.getDestination();
} catch (InterruptedException e) {
Expand Down Expand Up @@ -331,7 +332,7 @@ List<DataflowPackage> stageClasspathElements(

final AtomicInteger numUploaded = new AtomicInteger(0);
final AtomicInteger numCached = new AtomicInteger(0);
List<ListenableFuture<DataflowPackage>> destinationPackages = new ArrayList<>();
List<CompletionStage<DataflowPackage>> destinationPackages = new ArrayList<>();

for (String classpathElement : classpathElements) {
DataflowPackage sourcePackage = new DataflowPackage();
Expand All @@ -350,15 +351,14 @@ List<DataflowPackage> stageClasspathElements(
continue;
}

// TODO: Java 8 / Guava 23.0: FluentFuture
ListenableFuture<StagingResult> stagingResult =
Futures.transformAsync(
computePackageAttributes(sourcePackage, stagingPath),
packageAttributes -> stagePackage(packageAttributes, retrySleeper, createOptions));
CompletionStage<StagingResult> stagingResult =
computePackageAttributes(sourcePackage, stagingPath)
.thenComposeAsync(
packageAttributes ->
stagePackage(packageAttributes, retrySleeper, createOptions));

ListenableFuture<DataflowPackage> stagedPackage =
Futures.transform(
stagingResult,
CompletionStage<DataflowPackage> stagedPackage =
stagingResult.thenApply(
stagingResult1 -> {
if (stagingResult1.alreadyStaged()) {
numCached.incrementAndGet();
Expand All @@ -372,19 +372,19 @@ List<DataflowPackage> stageClasspathElements(
}

try {
ListenableFuture<List<DataflowPackage>> stagingFutures =
Futures.allAsList(destinationPackages);
CompletionStage<List<DataflowPackage>> stagingFutures =
MoreFutures.allAsList(destinationPackages);
boolean finished = false;
do {
try {
stagingFutures.get(3L, TimeUnit.MINUTES);
MoreFutures.get(stagingFutures, 3L, TimeUnit.MINUTES);
finished = true;
} catch (TimeoutException e) {
// finished will still be false
LOG.info("Still staging {} files", classpathElements.size());
}
} while (!finished);
List<DataflowPackage> stagedPackages = stagingFutures.get();
List<DataflowPackage> stagedPackages = MoreFutures.get(stagingFutures);
LOG.info(
"Staging files complete: {} files cached, {} files newly uploaded",
numCached.get(), numUploaded.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,17 @@
*/
package org.apache.beam.runners.fnexecution.control;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.Closeable;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
import org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -52,7 +49,8 @@ public class FnApiControlClient implements Closeable {
// All writes to this StreamObserver need to be synchronized.
private final StreamObserver<BeamFnApi.InstructionRequest> requestReceiver;
private final ResponseStreamObserver responseObserver = new ResponseStreamObserver();
private final ConcurrentMap<String, SettableFuture<InstructionResponse>> outstandingRequests;
private final ConcurrentMap<String, CompletableFuture<BeamFnApi.InstructionResponse>>
outstandingRequests;
private AtomicBoolean isClosed = new AtomicBoolean(false);

private FnApiControlClient(StreamObserver<BeamFnApi.InstructionRequest> requestReceiver) {
Expand All @@ -72,16 +70,11 @@ public static FnApiControlClient forRequestObserver(
return new FnApiControlClient(requestObserver);
}

public ListenableFuture<BeamFnApi.InstructionResponse> handle(
public CompletionStage<BeamFnApi.InstructionResponse> handle(
BeamFnApi.InstructionRequest request) {
LOG.debug("Sending InstructionRequest {}", request);
SettableFuture<BeamFnApi.InstructionResponse> resultFuture = SettableFuture.create();
SettableFuture<InstructionResponse> previousResponseFuture =
outstandingRequests.put(request.getInstructionId(), resultFuture);
checkArgument(
previousResponseFuture == null,
"Tried to handle multiple instructions with the same ID %s",
request.getInstructionId());
CompletableFuture<BeamFnApi.InstructionResponse> resultFuture = new CompletableFuture<>();
outstandingRequests.put(request.getInstructionId(), resultFuture);
requestReceiver.onNext(request);
return resultFuture;
}
Expand All @@ -102,7 +95,7 @@ private void closeAndTerminateOutstandingRequests(Throwable cause) {
}

// Make a copy of the map to make the view of the outstanding requests consistent.
Map<String, SettableFuture<BeamFnApi.InstructionResponse>> outstandingRequestsCopy =
Map<String, CompletableFuture<BeamFnApi.InstructionResponse>> outstandingRequestsCopy =
new ConcurrentHashMap<>(outstandingRequests);
outstandingRequests.clear();

Expand All @@ -117,9 +110,9 @@ private void closeAndTerminateOutstandingRequests(Throwable cause) {
"{} closed, clearing outstanding requests {}",
FnApiControlClient.class.getSimpleName(),
outstandingRequestsCopy);
for (SettableFuture<BeamFnApi.InstructionResponse> outstandingRequest :
for (CompletableFuture<BeamFnApi.InstructionResponse> outstandingRequest :
outstandingRequestsCopy.values()) {
outstandingRequest.setException(cause);
outstandingRequest.completeExceptionally(cause);
}
}

Expand All @@ -135,13 +128,13 @@ private class ResponseStreamObserver implements StreamObserver<BeamFnApi.Instruc
@Override
public void onNext(BeamFnApi.InstructionResponse response) {
LOG.debug("Received InstructionResponse {}", response);
SettableFuture<BeamFnApi.InstructionResponse> completableFuture =
CompletableFuture<BeamFnApi.InstructionResponse> responseFuture =
outstandingRequests.remove(response.getInstructionId());
if (completableFuture != null) {
if (responseFuture != null) {
if (response.getError().isEmpty()) {
completableFuture.set(response);
responseFuture.complete(response);
} else {
completableFuture.setException(
responseFuture.completeExceptionally(
new RuntimeException(String.format(
"Error received from SDK harness for instruction %s: %s",
response.getInstructionId(),
Expand Down
Loading