Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@
package org.apache.beam.sdk.runners.inprocess;

import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;

import java.util.Collection;
import java.util.Map;

/**
* A callback for completing a bundle of input.
*/
interface CompletionCallback {
/**
* Handle a successful result, returning the committed outputs of the result.
* Handle a successful result, returning the committed outputs of the result and the transforms
* that should consume those outputs.
*/
Iterable<? extends CommittedBundle<?>> handleResult(
Map<? extends CommittedBundle<?>, Collection<AppliedPTransform<?, ?, ?>>> handleResult(
CommittedBundle<?> inputBundle, InProcessTransformResult result);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.runners.inprocess;

import static com.google.common.base.Preconditions.checkArgument;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers;
import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
Expand Down Expand Up @@ -64,7 +66,6 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {

private final ExecutorService executorService;

private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers;
private final Set<PValue> keyedPValues;
private final TransformEvaluatorRegistry registry;
@SuppressWarnings("rawtypes")
Expand All @@ -86,26 +87,23 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {

public static ExecutorServiceParallelExecutor create(
ExecutorService executorService,
Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
Set<PValue> keyedPValues,
TransformEvaluatorRegistry registry,
@SuppressWarnings("rawtypes")
Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements,
InProcessEvaluationContext context) {
return new ExecutorServiceParallelExecutor(
executorService, valueToConsumers, keyedPValues, registry, transformEnforcements, context);
executorService, keyedPValues, registry, transformEnforcements, context);
}

private ExecutorServiceParallelExecutor(
ExecutorService executorService,
Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
Set<PValue> keyedPValues,
TransformEvaluatorRegistry registry,
@SuppressWarnings("rawtypes")
Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>> transformEnforcements,
InProcessEvaluationContext context) {
this.executorService = executorService;
this.valueToConsumers = valueToConsumers;
this.keyedPValues = keyedPValues;
this.registry = registry;
this.transformEnforcements = transformEnforcements;
Expand Down Expand Up @@ -191,10 +189,8 @@ private boolean isKeyed(PValue pvalue) {
return keyedPValues.contains(pvalue);
}

private void scheduleConsumers(CommittedBundle<?> bundle) {
for (AppliedPTransform<?, ?, ?> consumer : valueToConsumers.get(bundle.getPCollection())) {
scheduleConsumption(consumer, bundle, defaultCompletionCallback);
}
private void scheduleConsumers(CommittedBundle<?> bundle, AppliedPTransform<?, ?, ?> consumer) {
scheduleConsumption(consumer, bundle, defaultCompletionCallback);
}

@Override
Expand All @@ -216,12 +212,16 @@ public void awaitCompletion() throws Throwable {
*/
private class DefaultCompletionCallback implements CompletionCallback {
@Override
public Iterable<? extends CommittedBundle<?>> handleResult(
public Map<CommittedBundle<?>, Collection<AppliedPTransform<?, ?, ?>>> handleResult(
CommittedBundle<?> inputBundle, InProcessTransformResult result) {
Iterable<? extends CommittedBundle<?>> resultBundles =
Map<CommittedBundle<?>, Collection<AppliedPTransform<?, ?, ?>>> resultBundles =
evaluationContext.handleResult(inputBundle, Collections.<TimerData>emptyList(), result);
for (CommittedBundle<?> outputBundle : resultBundles) {
allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
for (Map.Entry<CommittedBundle<?>, Collection<AppliedPTransform<?, ?, ?>>> output
: resultBundles.entrySet()) {
CommittedBundle<?> bundle = output.getKey();
for (AppliedPTransform<?, ?, ?> consumer : output.getValue()) {
allUpdates.offer(ExecutorUpdate.fromBundle(bundle, consumer));
}
}
return resultBundles;
}
Expand All @@ -246,12 +246,16 @@ private TimerCompletionCallback(Iterable<TimerData> timers) {
}

@Override
public Iterable<? extends CommittedBundle<?>> handleResult(
public Map<CommittedBundle<?>, Collection<AppliedPTransform<?, ?, ?>>> handleResult(
CommittedBundle<?> inputBundle, InProcessTransformResult result) {
Iterable<? extends CommittedBundle<?>> resultBundles =
Map<CommittedBundle<?>, Collection<AppliedPTransform<?, ?, ?>>> resultBundles =
evaluationContext.handleResult(inputBundle, timers, result);
for (CommittedBundle<?> outputBundle : resultBundles) {
allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
for (Map.Entry<CommittedBundle<?>, Collection<AppliedPTransform<?, ?, ?>>> output
: resultBundles.entrySet()) {
CommittedBundle<?> bundle = output.getKey();
for (AppliedPTransform<?, ?, ?> consumer : output.getValue()) {
allUpdates.offer(ExecutorUpdate.fromBundle(bundle, consumer));
}
}
return resultBundles;
}
Expand All @@ -268,26 +272,42 @@ public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
* Used to signal when the executor should be shut down (due to an exception).
*/
private static class ExecutorUpdate {
/** The bundle to be consumed. If present, consumer must also be present. */
private final Optional<? extends CommittedBundle<?>> bundle;
/** The consumer of the bundle. If present, bundle must also be present. */
private final Optional<? extends AppliedPTransform<?, ?, ?>> consumer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment here about the invariant that this is either/or where two things are null together.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


private final Optional<? extends Throwable> throwable;

public static ExecutorUpdate fromBundle(CommittedBundle<?> bundle) {
return new ExecutorUpdate(bundle, null);
public static ExecutorUpdate fromBundle(
CommittedBundle<?> bundle, AppliedPTransform<?, ?, ?> consumer) {
return new ExecutorUpdate(bundle, consumer, null);
}

public static ExecutorUpdate fromThrowable(Throwable t) {
return new ExecutorUpdate(null, t);
return new ExecutorUpdate(null, null, t);
}

private ExecutorUpdate(CommittedBundle<?> producedBundle, Throwable throwable) {
private ExecutorUpdate(
@Nullable CommittedBundle<?> producedBundle,
@Nullable AppliedPTransform<?, ?, ?> consumer,
@Nullable Throwable throwable) {
checkArgument((producedBundle == null) == (consumer == null),
"The produced bundle and consuming PTransform must either "
+ "both be null or neither be null");
this.bundle = Optional.fromNullable(producedBundle);
this.consumer = Optional.fromNullable(consumer);
this.throwable = Optional.fromNullable(throwable);
}

public Optional<? extends CommittedBundle<?>> getBundle() {
return bundle;
}

public Optional<? extends AppliedPTransform<?, ?, ?>> getConsumer() {
return consumer;
}

public Optional<? extends Throwable> getException() {
return throwable;
}
Expand Down Expand Up @@ -344,7 +364,7 @@ public void run() {
while (update != null) {
LOG.debug("Executor Update: {}", update);
if (update.getBundle().isPresent()) {
scheduleConsumers(update.getBundle().get());
scheduleConsumers(update.getBundle().get(), update.getConsumer().get());
} else if (update.getException().isPresent()) {
visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,15 +800,19 @@ public TransformWatermarks getWatermarks(AppliedPTransform<?, ?, ?> transform) {
*
* @param completed the input that has completed
* @param transform the transform that has completed processing the input
* @param outputs the bundles the transform has output
* @param timerUpdate the timers that fired to produce this update, plus the timers that were
* added or removed as part of processing this update
* @param outputs the CommittedBundles that were output by processing the input bundle, and the
* PTransforms that the bundles will be consumed by. Elements in each output bundle
* become pending on each AppliedPTransform that will consume them
* @param earliestHold the earliest watermark hold in the transform's state. {@code null} if there
* is no hold
*/
public void updateWatermarks(
@Nullable CommittedBundle<?> completed,
AppliedPTransform<?, ?, ?> transform,
TimerUpdate timerUpdate,
Iterable<? extends CommittedBundle<?>> outputs,
Map<? extends CommittedBundle<?>, Collection<AppliedPTransform<?, ?, ?>>> outputs,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The @param documentation for this is a bit terse. Should make it explain the map clearly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@Nullable Instant earliestHold) {
updatePending(completed, transform, timerUpdate, outputs);
TransformWatermarks transformWms = transformToWatermarks.get(transform);
Expand Down Expand Up @@ -836,20 +840,30 @@ private void refreshWatermarks(AppliedPTransform<?, ?, ?> transform) {
* and removes all deleted timers. Removes all elements consumed by the input bundle from the
* {@link PTransform PTransforms} collection of pending elements, and adds all elements produced
* by the {@link PTransform} to the pending queue of each consumer.
*
* @param input the CommittedBundle that produced this update
* @param transform the AppliedPTransform that consumed the input to produce the outputs
* @param timerUpdate the timers that fired to produce this update, plus the timers that were
* added or removed as part of processing this update
* @param outputs the CommittedBundles that were output by processing the input bundle, and the
* PTransforms that the bundles will be consumed by. Elements in each output bundle
* become pending on each AppliedPTransform that will consume them
*/
private void updatePending(
CommittedBundle<?> input,
AppliedPTransform<?, ?, ?> transform,
TimerUpdate timerUpdate,
Iterable<? extends CommittedBundle<?>> outputs) {
Map<? extends CommittedBundle<?>, Collection<AppliedPTransform<?, ?, ?>>> outputs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

TransformWatermarks completedTransform = transformToWatermarks.get(transform);
completedTransform.updateTimers(timerUpdate);
if (input != null) {
completedTransform.removePending(input);
}

for (CommittedBundle<?> bundle : outputs) {
for (AppliedPTransform<?, ?, ?> consumer : consumers.get(bundle.getPCollection())) {
for (Map.Entry<? extends CommittedBundle<?>, Collection<AppliedPTransform<?, ?, ?>>>
outputEntry : outputs.entrySet()) {
CommittedBundle<?> bundle = outputEntry.getKey();
for (AppliedPTransform<?, ?, ?> consumer : outputEntry.getValue()) {
TransformWatermarks watermarks = transformToWatermarks.get(consumer);
watermarks.addPending(bundle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;

import java.util.Collection;
Expand Down Expand Up @@ -74,6 +74,11 @@
class InProcessEvaluationContext {
/** The step name for each {@link AppliedPTransform} in the {@link Pipeline}. */
private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;
/**
* The mapping from each {@link PValue} contained within the {@link Pipeline} to each
* {@link AppliedPTransform} that consumes it.
*/
private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers;

/** The options that were used to create this {@link Pipeline}. */
private final InProcessPipelineOptions options;
Expand Down Expand Up @@ -114,7 +119,7 @@ private InProcessEvaluationContext(
this.options = checkNotNull(options);
this.bundleFactory = checkNotNull(bundleFactory);
checkNotNull(rootTransforms);
checkNotNull(valueToConsumers);
this.valueToConsumers = checkNotNull(valueToConsumers);
checkNotNull(stepNames);
checkNotNull(views);
this.stepNames = stepNames;
Expand Down Expand Up @@ -143,13 +148,14 @@ private InProcessEvaluationContext(
* @param completedTimers the timers that were delivered to produce the {@code completedBundle},
* or an empty iterable if no timers were delivered
* @param result the result of evaluating the input bundle
* @return the committed bundles contained within the handled {@code result}
* @return a mapping between the Committed {@link UncommittedBundle bundles} contained within the
* result to each {@link AppliedPTransform} that will consume them
*/
public synchronized Iterable<? extends CommittedBundle<?>> handleResult(
public synchronized Map<CommittedBundle<?>, Collection<AppliedPTransform<?, ?, ?>>> handleResult(
@Nullable CommittedBundle<?> completedBundle,
Iterable<TimerData> completedTimers,
InProcessTransformResult result) {
Iterable<? extends CommittedBundle<?>> committedBundles =
Map<CommittedBundle<?>, Collection<AppliedPTransform<?, ?, ?>>> committedBundles =
commitBundles(result.getOutputBundles());
// Update watermarks and timers
watermarkManager.updateWatermarks(
Expand Down Expand Up @@ -179,10 +185,11 @@ public synchronized Iterable<? extends CommittedBundle<?>> handleResult(
return committedBundles;
}

private Iterable<? extends CommittedBundle<?>> commitBundles(
Iterable<? extends UncommittedBundle<?>> bundles) {
ImmutableList.Builder<CommittedBundle<?>> completed = ImmutableList.builder();
for (UncommittedBundle<?> inProgress : bundles) {
private Map<CommittedBundle<?>, Collection<AppliedPTransform<?, ?, ?>>> commitBundles(
Iterable<? extends UncommittedBundle<?>> outputBundles) {
ImmutableMap.Builder<CommittedBundle<?>, Collection<AppliedPTransform<?, ?, ?>>> outputs
= ImmutableMap.builder();
for (UncommittedBundle<?> inProgress : outputBundles) {
AppliedPTransform<?, ?, ?> producing =
inProgress.getPCollection().getProducingTransformInternal();
TransformWatermarks watermarks = watermarkManager.getWatermarks(producing);
Expand All @@ -191,10 +198,10 @@ private Iterable<? extends CommittedBundle<?>> commitBundles(
// Empty bundles don't impact watermarks and shouldn't trigger downstream execution, so
// filter them out
if (!Iterables.isEmpty(committed.getElements())) {
completed.add(committed);
outputs.put(committed, valueToConsumers.get(committed.getPCollection()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since committed carries with it all the information needed for a lookup in valueToConsumers, perhaps you don't need to convert the iterable to a map so early? Can it be deferred just to the moment you add them as pending?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes in the immediate follow-up to this CL, which allows a TransformEvaluator to return some elements as "unprocessed". Those elements are added to the result map here, but should only be consumed by the producing transform

Ex: 7da8e1a

}
}
return completed.build();
return outputs.build();
}

private void fireAllAvailableCallbacks() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ public InProcessPipelineResult run(Pipeline pipeline) {
InProcessExecutor executor =
ExecutorServiceParallelExecutor.create(
executorService,
consumerTrackingVisitor.getValueToConsumers(),
keyedPValueVisitor.getKeyedPValues(),
TransformEvaluatorRegistry.defaultRegistry(),
defaultModelEnforcements(options),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -158,9 +159,10 @@ private InProcessTransformResult finishBundle(
TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> enforcements)
throws Exception {
InProcessTransformResult result = evaluator.finishBundle();
Iterable<? extends CommittedBundle<?>> outputs = onComplete.handleResult(inputBundle, result);
Map<? extends CommittedBundle<?>, Collection<AppliedPTransform<?, ?, ?>>> outputs =
onComplete.handleResult(inputBundle, result);
for (ModelEnforcement<T> enforcement : enforcements) {
enforcement.afterFinish(inputBundle, result, outputs);
enforcement.afterFinish(inputBundle, result, outputs.keySet());
}
return result;
}
Expand Down
Loading